risingwave_connector/sink/file_sink/
opendal_sink.rs1use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::basic::Compression;
27use parquet::file::properties::WriterProperties;
28use risingwave_common::array::arrow::IcebergArrowConvert;
29use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
30use risingwave_common::array::{Op, StreamChunk};
31use risingwave_common::catalog::Schema;
32use serde::Deserialize;
33use serde_json::Value;
34use serde_with::{DisplayFromStr, serde_as};
35use strum_macros::{Display, EnumString};
36use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
37use with_options::WithOptions;
38
39use crate::enforce_secret::EnforceSecret;
40use crate::sink::catalog::SinkEncode;
41use crate::sink::encoder::{
42 JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
43 TimestamptzHandlingMode,
44};
45use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
46use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam};
47use crate::source::TryFromBTreeMap;
48use crate::with_options::WithOptions;
49
50pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
51pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
52
53pub fn default_rollover_seconds() -> usize {
54 DEFAULT_ROLLOVER_SECONDS
55}
56pub fn default_max_row_count() -> usize {
57 DEFAULT_MAX_ROW_COUNR
58}
59#[derive(Debug, Clone)]
65pub struct FileSink<S: OpendalSinkBackend> {
66 pub(crate) op: Operator,
67 pub(crate) path: String,
69 pub(crate) schema: Schema,
71 pub(crate) is_append_only: bool,
72 pub(crate) batching_strategy: BatchingStrategy,
74
75 pub(crate) format_desc: SinkFormatDesc,
77 pub(crate) engine_type: EngineType,
78 pub(crate) _marker: PhantomData<S>,
79}
80
81impl<S: OpendalSinkBackend> EnforceSecret for FileSink<S> {}
82
83pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
100 type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
101 const SINK_NAME: &'static str;
102
103 fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
104 fn new_operator(properties: Self::Properties) -> Result<Operator>;
105 fn get_path(properties: Self::Properties) -> String;
106 fn get_engine_type() -> EngineType;
107 fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
108}
109
110#[derive(Clone, Debug)]
111pub enum EngineType {
112 Gcs,
113 S3,
114 Fs,
115 Azblob,
116 Webhdfs,
117 Snowflake,
118}
119
120impl<S: OpendalSinkBackend> Sink for FileSink<S> {
121 type Coordinator = DummySinkCommitCoordinator;
122 type LogSinker = BatchingLogSinker;
123
124 const SINK_NAME: &'static str = S::SINK_NAME;
125
126 async fn validate(&self) -> Result<()> {
127 if matches!(self.engine_type, EngineType::Snowflake) {
128 risingwave_common::license::Feature::SnowflakeSink
129 .check_available()
130 .map_err(|e| anyhow::anyhow!(e))?;
131 }
132 if !self.is_append_only {
133 return Err(SinkError::Config(anyhow!(
134 "File sink only supports append-only mode at present. \
135 Please change the query to append-only, and specify it \
136 explicitly after the `FORMAT ... ENCODE ...` statement. \
137 For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
138 )));
139 }
140
141 if self.format_desc.encode != SinkEncode::Parquet
142 && self.format_desc.encode != SinkEncode::Json
143 {
144 return Err(SinkError::Config(anyhow!(
145 "File sink only supports `PARQUET` and `JSON` encode at present."
146 )));
147 }
148
149 match self.op.list(&self.path).await {
150 Ok(_) => Ok(()),
151 Err(e) => Err(anyhow!(e).into()),
152 }
153 }
154
155 async fn new_log_sinker(
156 &self,
157 writer_param: crate::sink::SinkWriterParam,
158 ) -> Result<Self::LogSinker> {
159 let writer = OpenDalSinkWriter::new(
160 self.op.clone(),
161 &self.path,
162 self.schema.clone(),
163 writer_param.executor_id,
164 &self.format_desc,
165 self.engine_type.clone(),
166 self.batching_strategy.clone(),
167 )?;
168 Ok(BatchingLogSinker::new(writer))
169 }
170}
171
172impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
173 type Error = SinkError;
174
175 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
176 let schema = param.schema();
177 let config = S::from_btreemap(param.properties)?;
178 let path = S::get_path(config.clone()).clone();
179 let op = S::new_operator(config.clone())?;
180 let batching_strategy = S::get_batching_strategy(config.clone());
181 let engine_type = S::get_engine_type();
182 let format_desc = match param.format_desc {
183 Some(desc) => desc,
184 None => {
185 if let EngineType::Snowflake = engine_type {
186 SinkFormatDesc::plain_json_for_snowflake_only()
187 } else {
188 return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
189 }
190 }
191 };
192 Ok(Self {
193 op,
194 path,
195 schema,
196 is_append_only: param.sink_type.is_append_only(),
197 batching_strategy,
198 format_desc,
199 engine_type,
200 _marker: PhantomData,
201 })
202 }
203}
204
205pub struct OpenDalSinkWriter {
206 schema: SchemaRef,
207 operator: Operator,
208 sink_writer: Option<FileWriterEnum>,
209 write_path: String,
210 executor_id: u64,
211 encode_type: SinkEncode,
212 row_encoder: JsonEncoder,
213 engine_type: EngineType,
214 pub(crate) batching_strategy: BatchingStrategy,
215 current_bached_row_num: usize,
216 created_time: SystemTime,
217}
218
219enum FileWriterEnum {
231 ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
232 FileWriter(OpendalWriter),
233}
234
235impl OpenDalSinkWriter {
237 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
239 if self.sink_writer.is_none() {
240 assert_eq!(self.current_bached_row_num, 0);
241 self.create_sink_writer().await?;
242 };
243 self.append_only(chunk).await?;
244 Ok(())
245 }
246
247 pub async fn commit(&mut self) -> Result<bool> {
249 if let Some(sink_writer) = self.sink_writer.take() {
250 match sink_writer {
251 FileWriterEnum::ParquetFileWriter(w) => {
252 if w.bytes_written() > 0 {
253 let metadata = w.close().await?;
254 tracing::info!(
255 "writer {:?}_{:?}finish write file, metadata: {:?}",
256 self.executor_id,
257 self.created_time
258 .duration_since(UNIX_EPOCH)
259 .expect("Time went backwards")
260 .as_secs(),
261 metadata
262 );
263 }
264 }
265 FileWriterEnum::FileWriter(mut w) => {
266 w.close().await?;
267 }
268 };
269 self.current_bached_row_num = 0;
270 return Ok(true);
271 }
272 Ok(false)
273 }
274
275 pub async fn try_commit(&mut self) -> Result<bool> {
277 if self.can_commit() {
278 return self.commit().await;
279 }
280 Ok(false)
281 }
282}
283
284impl OpenDalSinkWriter {
286 fn can_commit(&self) -> bool {
288 self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
289 || self.current_bached_row_num >= self.batching_strategy.max_row_count
290 }
291
292 fn path_partition_prefix(&self, duration: &Duration) -> String {
293 let datetime = Utc
294 .timestamp_opt(duration.as_secs() as i64, 0)
295 .single()
296 .expect("Failed to convert timestamp to DateTime<Utc>")
297 .with_timezone(&Utc);
298 let path_partition_prefix = self
299 .batching_strategy
300 .path_partition_prefix
301 .as_ref()
302 .unwrap_or(&PathPartitionPrefix::None);
303 match path_partition_prefix {
304 PathPartitionPrefix::None => "".to_owned(),
305 PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
306 PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
307 PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
308 }
309 }
310
311 fn duration_seconds_since_writer_created(&self) -> usize {
312 let now = SystemTime::now();
313 now.duration_since(self.created_time)
314 .expect("Time went backwards")
315 .as_secs() as usize
316 }
317
318 async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
320 match self
321 .sink_writer
322 .as_mut()
323 .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
324 {
325 FileWriterEnum::ParquetFileWriter(w) => {
326 let batch =
327 IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
328 let batch_row_nums = batch.num_rows();
329 w.write(&batch).await?;
330 self.current_bached_row_num += batch_row_nums;
331 }
332 FileWriterEnum::FileWriter(w) => {
333 let mut chunk_buf = BytesMut::new();
334 let batch_row_nums = chunk.data_chunk().capacity();
335 for (op, row) in chunk.rows() {
337 assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
338 writeln!(
341 chunk_buf,
342 "{}",
343 Value::Object(self.row_encoder.encode(row)?)
344 )
345 .unwrap(); }
347 w.write(chunk_buf.freeze()).await?;
348 self.current_bached_row_num += batch_row_nums;
349 }
350 }
351 Ok(())
352 }
353}
354
355impl OpenDalSinkWriter {
357 pub fn new(
358 operator: Operator,
359 write_path: &str,
360 rw_schema: Schema,
361 executor_id: u64,
362 format_desc: &SinkFormatDesc,
363 engine_type: EngineType,
364 batching_strategy: BatchingStrategy,
365 ) -> Result<Self> {
366 let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
367 let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
368 let row_encoder = JsonEncoder::new(
369 rw_schema,
370 None,
371 crate::sink::encoder::DateHandlingMode::String,
372 TimestampHandlingMode::String,
373 TimestamptzHandlingMode::UtcString,
374 TimeHandlingMode::String,
375 jsonb_handling_mode,
376 );
377 Ok(Self {
378 schema: Arc::new(arrow_schema),
379 write_path: write_path.to_owned(),
380 operator,
381 sink_writer: None,
382 executor_id,
383 encode_type: format_desc.encode.clone(),
384 row_encoder,
385 engine_type,
386 batching_strategy,
387 current_bached_row_num: 0,
388 created_time: SystemTime::now(),
389 })
390 }
391
392 async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
393 let suffix = match self.encode_type {
395 SinkEncode::Parquet => "parquet",
396 SinkEncode::Json => "json",
397 _ => unimplemented!(),
398 };
399
400 let create_time = self
401 .created_time
402 .duration_since(UNIX_EPOCH)
403 .expect("Time went backwards");
404
405 let object_name = {
413 let base_path = match self.engine_type {
414 EngineType::Fs => "".to_owned(),
415 EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
416 _ => format!("{}/", self.write_path),
417 };
418
419 format!(
420 "{}{}{}_{}.{}",
421 base_path,
422 self.path_partition_prefix(&create_time),
423 self.executor_id,
424 create_time.as_secs(),
425 suffix,
426 )
427 };
428 Ok(self
429 .operator
430 .writer_with(&object_name)
431 .concurrent(8)
432 .await?)
433 }
434
435 async fn create_sink_writer(&mut self) -> Result<()> {
436 let object_writer = self.create_object_writer().await?;
437 match self.encode_type {
438 SinkEncode::Parquet => {
439 let props = WriterProperties::builder().set_compression(Compression::SNAPPY);
440 let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
441 object_writer.into_futures_async_write().compat_write();
442 self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
443 AsyncArrowWriter::try_new(
444 parquet_writer,
445 self.schema.clone(),
446 Some(props.build()),
447 )?,
448 ));
449 }
450 _ => {
451 self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
452 }
453 }
454 self.current_bached_row_num = 0;
455
456 self.created_time = SystemTime::now();
457
458 Ok(())
459 }
460}
461
462fn convert_rw_schema_to_arrow_schema(
463 rw_schema: risingwave_common::catalog::Schema,
464) -> anyhow::Result<arrow_schema_iceberg::Schema> {
465 let mut schema_fields = HashMap::new();
466 rw_schema.fields.iter().for_each(|field| {
467 let res = schema_fields.insert(&field.name, &field.data_type);
468 assert!(res.is_none())
470 });
471 let mut arrow_fields = vec![];
472 for rw_field in &rw_schema.fields {
473 let arrow_field = IcebergArrowConvert
474 .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
475
476 arrow_fields.push(arrow_field);
477 }
478
479 Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
480}
481
482#[serde_as]
494#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
495pub struct BatchingStrategy {
496 #[serde(default = "default_max_row_count")]
497 #[serde_as(as = "DisplayFromStr")]
498 pub max_row_count: usize,
499 #[serde(default = "default_rollover_seconds")]
500 #[serde_as(as = "DisplayFromStr")]
501 pub rollover_seconds: usize,
502 #[serde(default)]
503 #[serde_as(as = "Option<DisplayFromStr>")]
504 pub path_partition_prefix: Option<PathPartitionPrefix>,
505}
506
507#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
515#[strum(serialize_all = "snake_case")]
516pub enum PathPartitionPrefix {
517 #[default]
518 None = 0,
519 #[serde(alias = "day")]
520 Day = 1,
521 #[serde(alias = "month")]
522 Month = 2,
523 #[serde(alias = "hour")]
524 Hour = 3,
525}