risingwave_connector/sink/file_sink/
opendal_sink.rs1use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::file::properties::WriterProperties;
27use risingwave_common::array::arrow::IcebergArrowConvert;
28use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
29use risingwave_common::array::{Op, StreamChunk};
30use risingwave_common::catalog::Schema;
31use serde::Deserialize;
32use serde_json::Value;
33use serde_with::{DisplayFromStr, serde_as};
34use strum_macros::{Display, EnumString};
35use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
36use with_options::WithOptions;
37
38use crate::enforce_secret::EnforceSecret;
39use crate::sink::catalog::SinkEncode;
40use crate::sink::encoder::{
41 JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
42 TimestamptzHandlingMode,
43};
44use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
45use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam};
46use crate::source::TryFromBTreeMap;
47use crate::with_options::WithOptions;
48
49pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
50pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
51
52pub fn default_rollover_seconds() -> usize {
53 DEFAULT_ROLLOVER_SECONDS
54}
55pub fn default_max_row_count() -> usize {
56 DEFAULT_MAX_ROW_COUNR
57}
58#[derive(Debug, Clone)]
64pub struct FileSink<S: OpendalSinkBackend> {
65 pub(crate) op: Operator,
66 pub(crate) path: String,
68 pub(crate) schema: Schema,
70 pub(crate) is_append_only: bool,
71 pub(crate) batching_strategy: BatchingStrategy,
73
74 pub(crate) format_desc: SinkFormatDesc,
76 pub(crate) engine_type: EngineType,
77 pub(crate) _marker: PhantomData<S>,
78}
79
80impl<S: OpendalSinkBackend> EnforceSecret for FileSink<S> {}
81
82pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
99 type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
100 const SINK_NAME: &'static str;
101
102 fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
103 fn new_operator(properties: Self::Properties) -> Result<Operator>;
104 fn get_path(properties: Self::Properties) -> String;
105 fn get_engine_type() -> EngineType;
106 fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
107}
108
109#[derive(Clone, Debug)]
110pub enum EngineType {
111 Gcs,
112 S3,
113 Fs,
114 Azblob,
115 Webhdfs,
116 Snowflake,
117}
118
119impl<S: OpendalSinkBackend> Sink for FileSink<S> {
120 type Coordinator = DummySinkCommitCoordinator;
121 type LogSinker = BatchingLogSinker;
122
123 const SINK_NAME: &'static str = S::SINK_NAME;
124
125 async fn validate(&self) -> Result<()> {
126 if matches!(self.engine_type, EngineType::Snowflake) {
127 risingwave_common::license::Feature::SnowflakeSink
128 .check_available()
129 .map_err(|e| anyhow::anyhow!(e))?;
130 }
131 if !self.is_append_only {
132 return Err(SinkError::Config(anyhow!(
133 "File sink only supports append-only mode at present. \
134 Please change the query to append-only, and specify it \
135 explicitly after the `FORMAT ... ENCODE ...` statement. \
136 For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
137 )));
138 }
139
140 if self.format_desc.encode != SinkEncode::Parquet
141 && self.format_desc.encode != SinkEncode::Json
142 {
143 return Err(SinkError::Config(anyhow!(
144 "File sink only supports `PARQUET` and `JSON` encode at present."
145 )));
146 }
147
148 match self.op.list(&self.path).await {
149 Ok(_) => Ok(()),
150 Err(e) => Err(anyhow!(e).into()),
151 }
152 }
153
154 async fn new_log_sinker(
155 &self,
156 writer_param: crate::sink::SinkWriterParam,
157 ) -> Result<Self::LogSinker> {
158 let writer = OpenDalSinkWriter::new(
159 self.op.clone(),
160 &self.path,
161 self.schema.clone(),
162 writer_param.executor_id,
163 &self.format_desc,
164 self.engine_type.clone(),
165 self.batching_strategy.clone(),
166 )?;
167 Ok(BatchingLogSinker::new(writer))
168 }
169}
170
171impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
172 type Error = SinkError;
173
174 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
175 let schema = param.schema();
176 let config = S::from_btreemap(param.properties)?;
177 let path = S::get_path(config.clone()).clone();
178 let op = S::new_operator(config.clone())?;
179 let batching_strategy = S::get_batching_strategy(config.clone());
180 let engine_type = S::get_engine_type();
181 let format_desc = match param.format_desc {
182 Some(desc) => desc,
183 None => {
184 if let EngineType::Snowflake = engine_type {
185 SinkFormatDesc::plain_json_for_snowflake_only()
186 } else {
187 return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
188 }
189 }
190 };
191 Ok(Self {
192 op,
193 path,
194 schema,
195 is_append_only: param.sink_type.is_append_only(),
196 batching_strategy,
197 format_desc,
198 engine_type,
199 _marker: PhantomData,
200 })
201 }
202}
203
204pub struct OpenDalSinkWriter {
205 schema: SchemaRef,
206 operator: Operator,
207 sink_writer: Option<FileWriterEnum>,
208 write_path: String,
209 executor_id: u64,
210 encode_type: SinkEncode,
211 row_encoder: JsonEncoder,
212 engine_type: EngineType,
213 pub(crate) batching_strategy: BatchingStrategy,
214 current_bached_row_num: usize,
215 created_time: SystemTime,
216}
217
218enum FileWriterEnum {
230 ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
231 FileWriter(OpendalWriter),
232}
233
234impl OpenDalSinkWriter {
236 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
238 if self.sink_writer.is_none() {
239 assert_eq!(self.current_bached_row_num, 0);
240 self.create_sink_writer().await?;
241 };
242 self.append_only(chunk).await?;
243 Ok(())
244 }
245
246 pub async fn commit(&mut self) -> Result<bool> {
248 if let Some(sink_writer) = self.sink_writer.take() {
249 match sink_writer {
250 FileWriterEnum::ParquetFileWriter(w) => {
251 if w.bytes_written() > 0 {
252 let metadata = w.close().await?;
253 tracing::info!(
254 "writer {:?}_{:?}finish write file, metadata: {:?}",
255 self.executor_id,
256 self.created_time
257 .duration_since(UNIX_EPOCH)
258 .expect("Time went backwards")
259 .as_secs(),
260 metadata
261 );
262 }
263 }
264 FileWriterEnum::FileWriter(mut w) => {
265 w.close().await?;
266 }
267 };
268 self.current_bached_row_num = 0;
269 return Ok(true);
270 }
271 Ok(false)
272 }
273
274 pub async fn try_commit(&mut self) -> Result<bool> {
276 if self.can_commit() {
277 return self.commit().await;
278 }
279 Ok(false)
280 }
281}
282
283impl OpenDalSinkWriter {
285 fn can_commit(&self) -> bool {
287 self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
288 || self.current_bached_row_num >= self.batching_strategy.max_row_count
289 }
290
291 fn path_partition_prefix(&self, duration: &Duration) -> String {
292 let datetime = Utc
293 .timestamp_opt(duration.as_secs() as i64, 0)
294 .single()
295 .expect("Failed to convert timestamp to DateTime<Utc>")
296 .with_timezone(&Utc);
297 let path_partition_prefix = self
298 .batching_strategy
299 .path_partition_prefix
300 .as_ref()
301 .unwrap_or(&PathPartitionPrefix::None);
302 match path_partition_prefix {
303 PathPartitionPrefix::None => "".to_owned(),
304 PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
305 PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
306 PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
307 }
308 }
309
310 fn duration_seconds_since_writer_created(&self) -> usize {
311 let now = SystemTime::now();
312 now.duration_since(self.created_time)
313 .expect("Time went backwards")
314 .as_secs() as usize
315 }
316
317 async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
319 match self
320 .sink_writer
321 .as_mut()
322 .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
323 {
324 FileWriterEnum::ParquetFileWriter(w) => {
325 let batch =
326 IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
327 let batch_row_nums = batch.num_rows();
328 w.write(&batch).await?;
329 self.current_bached_row_num += batch_row_nums;
330 }
331 FileWriterEnum::FileWriter(w) => {
332 let mut chunk_buf = BytesMut::new();
333 let batch_row_nums = chunk.data_chunk().capacity();
334 for (op, row) in chunk.rows() {
336 assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
337 writeln!(
340 chunk_buf,
341 "{}",
342 Value::Object(self.row_encoder.encode(row)?)
343 )
344 .unwrap(); }
346 w.write(chunk_buf.freeze()).await?;
347 self.current_bached_row_num += batch_row_nums;
348 }
349 }
350 Ok(())
351 }
352}
353
354impl OpenDalSinkWriter {
356 pub fn new(
357 operator: Operator,
358 write_path: &str,
359 rw_schema: Schema,
360 executor_id: u64,
361 format_desc: &SinkFormatDesc,
362 engine_type: EngineType,
363 batching_strategy: BatchingStrategy,
364 ) -> Result<Self> {
365 let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
366 let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
367 let row_encoder = JsonEncoder::new(
368 rw_schema,
369 None,
370 crate::sink::encoder::DateHandlingMode::String,
371 TimestampHandlingMode::String,
372 TimestamptzHandlingMode::UtcString,
373 TimeHandlingMode::String,
374 jsonb_handling_mode,
375 );
376 Ok(Self {
377 schema: Arc::new(arrow_schema),
378 write_path: write_path.to_owned(),
379 operator,
380 sink_writer: None,
381 executor_id,
382 encode_type: format_desc.encode.clone(),
383 row_encoder,
384 engine_type,
385 batching_strategy,
386 current_bached_row_num: 0,
387 created_time: SystemTime::now(),
388 })
389 }
390
391 async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
392 let suffix = match self.encode_type {
394 SinkEncode::Parquet => "parquet",
395 SinkEncode::Json => "json",
396 _ => unimplemented!(),
397 };
398
399 let create_time = self
400 .created_time
401 .duration_since(UNIX_EPOCH)
402 .expect("Time went backwards");
403
404 let object_name = {
412 let base_path = match self.engine_type {
413 EngineType::Fs => "".to_owned(),
414 EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
415 _ => format!("{}/", self.write_path),
416 };
417
418 format!(
419 "{}{}{}_{}.{}",
420 base_path,
421 self.path_partition_prefix(&create_time),
422 self.executor_id,
423 create_time.as_secs(),
424 suffix,
425 )
426 };
427 Ok(self
428 .operator
429 .writer_with(&object_name)
430 .concurrent(8)
431 .await?)
432 }
433
434 async fn create_sink_writer(&mut self) -> Result<()> {
435 let object_writer = self.create_object_writer().await?;
436 match self.encode_type {
437 SinkEncode::Parquet => {
438 let props = WriterProperties::builder();
439 let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
440 object_writer.into_futures_async_write().compat_write();
441 self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
442 AsyncArrowWriter::try_new(
443 parquet_writer,
444 self.schema.clone(),
445 Some(props.build()),
446 )?,
447 ));
448 }
449 _ => {
450 self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
451 }
452 }
453 self.current_bached_row_num = 0;
454
455 self.created_time = SystemTime::now();
456
457 Ok(())
458 }
459}
460
461fn convert_rw_schema_to_arrow_schema(
462 rw_schema: risingwave_common::catalog::Schema,
463) -> anyhow::Result<arrow_schema_iceberg::Schema> {
464 let mut schema_fields = HashMap::new();
465 rw_schema.fields.iter().for_each(|field| {
466 let res = schema_fields.insert(&field.name, &field.data_type);
467 assert!(res.is_none())
469 });
470 let mut arrow_fields = vec![];
471 for rw_field in &rw_schema.fields {
472 let arrow_field = IcebergArrowConvert
473 .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
474
475 arrow_fields.push(arrow_field);
476 }
477
478 Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
479}
480
481#[serde_as]
493#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
494pub struct BatchingStrategy {
495 #[serde(default = "default_max_row_count")]
496 #[serde_as(as = "DisplayFromStr")]
497 pub max_row_count: usize,
498 #[serde(default = "default_rollover_seconds")]
499 #[serde_as(as = "DisplayFromStr")]
500 pub rollover_seconds: usize,
501 #[serde(default)]
502 #[serde_as(as = "Option<DisplayFromStr>")]
503 pub path_partition_prefix: Option<PathPartitionPrefix>,
504}
505
506#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
514#[strum(serialize_all = "snake_case")]
515pub enum PathPartitionPrefix {
516 #[default]
517 None = 0,
518 #[serde(alias = "day")]
519 Day = 1,
520 #[serde(alias = "month")]
521 Month = 2,
522 #[serde(alias = "hour")]
523 Hour = 3,
524}