1use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::basic::Compression;
27use parquet::file::properties::WriterProperties;
28use risingwave_common::array::arrow::IcebergArrowConvert;
29use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
30use risingwave_common::array::{Op, StreamChunk};
31use risingwave_common::catalog::Schema;
32use risingwave_pb::id::ExecutorId;
33use serde::Deserialize;
34use serde_json::Value;
35use serde_with::{DisplayFromStr, serde_as};
36use strum_macros::{Display, EnumString};
37use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
38use uuid::Uuid;
39use with_options::WithOptions;
40
41use crate::enforce_secret::EnforceSecret;
42use crate::sink::catalog::SinkEncode;
43use crate::sink::encoder::{
44 JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
45 TimestamptzHandlingMode,
46};
47use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
48use crate::sink::{Result, Sink, SinkError, SinkFormatDesc, SinkParam};
49use crate::source::TryFromBTreeMap;
50use crate::with_options::WithOptions;
51
52pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
53pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
54
55pub fn default_rollover_seconds() -> usize {
56 DEFAULT_ROLLOVER_SECONDS
57}
58pub fn default_max_row_count() -> usize {
59 DEFAULT_MAX_ROW_COUNR
60}
61#[derive(Debug, Clone)]
67pub struct FileSink<S: OpendalSinkBackend> {
68 pub(crate) op: Operator,
69 pub(crate) path: String,
71 pub(crate) schema: Schema,
73 pub(crate) is_append_only: bool,
74 pub(crate) batching_strategy: BatchingStrategy,
76
77 pub(crate) format_desc: SinkFormatDesc,
79 pub(crate) engine_type: EngineType,
80 pub(crate) _marker: PhantomData<S>,
81}
82
83impl<S: OpendalSinkBackend> EnforceSecret for FileSink<S> {}
84
85pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
102 type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
103 const SINK_NAME: &'static str;
104
105 fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
106 fn new_operator(properties: Self::Properties) -> Result<Operator>;
107 fn get_path(properties: Self::Properties) -> String;
108 fn get_engine_type() -> EngineType;
109 fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
110}
111
112#[derive(Clone, Debug)]
113pub enum EngineType {
114 Gcs,
115 S3,
116 Fs,
117 Azblob,
118 Webhdfs,
119 Snowflake,
120}
121
122impl<S: OpendalSinkBackend> Sink for FileSink<S> {
123 type LogSinker = BatchingLogSinker;
124
125 const SINK_NAME: &'static str = S::SINK_NAME;
126
127 async fn validate(&self) -> Result<()> {
128 if matches!(self.engine_type, EngineType::Snowflake) {
129 risingwave_common::license::Feature::SnowflakeSink
130 .check_available()
131 .map_err(|e| anyhow::anyhow!(e))?;
132 }
133 if !self.is_append_only {
134 return Err(SinkError::Config(anyhow!(
135 "File sink only supports append-only mode at present. \
136 Please change the query to append-only, and specify it \
137 explicitly after the `FORMAT ... ENCODE ...` statement. \
138 For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
139 )));
140 }
141
142 if self.format_desc.encode != SinkEncode::Parquet
143 && self.format_desc.encode != SinkEncode::Json
144 {
145 return Err(SinkError::Config(anyhow!(
146 "File sink only supports `PARQUET` and `JSON` encode at present."
147 )));
148 }
149
150 match self.op.list(&self.path).await {
151 Ok(_) => Ok(()),
152 Err(e) => Err(anyhow!(e).into()),
153 }
154 }
155
156 async fn new_log_sinker(
157 &self,
158 writer_param: crate::sink::SinkWriterParam,
159 ) -> Result<Self::LogSinker> {
160 let writer = OpenDalSinkWriter::new(
161 self.op.clone(),
162 &self.path,
163 self.schema.clone(),
164 writer_param.executor_id,
165 &self.format_desc,
166 self.engine_type.clone(),
167 self.batching_strategy.clone(),
168 )?;
169 Ok(BatchingLogSinker::new(writer))
170 }
171}
172
173impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
174 type Error = SinkError;
175
176 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
177 let schema = param.schema();
178 let config = S::from_btreemap(param.properties)?;
179 let path = S::get_path(config.clone());
180 let op = S::new_operator(config.clone())?;
181 let batching_strategy = S::get_batching_strategy(config);
182 let engine_type = S::get_engine_type();
183 let format_desc = match param.format_desc {
184 Some(desc) => desc,
185 None => {
186 if let EngineType::Snowflake = engine_type {
187 SinkFormatDesc::plain_json_for_snowflake_only()
188 } else {
189 return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
190 }
191 }
192 };
193 Ok(Self {
194 op,
195 path,
196 schema,
197 is_append_only: param.sink_type.is_append_only(),
198 batching_strategy,
199 format_desc,
200 engine_type,
201 _marker: PhantomData,
202 })
203 }
204}
205
206pub struct OpenDalSinkWriter {
207 schema: SchemaRef,
208 operator: Operator,
209 sink_writer: Option<FileWriterEnum>,
210 write_path: String,
211 executor_id: ExecutorId,
212 unique_writer_id: Uuid,
213 encode_type: SinkEncode,
214 row_encoder: JsonEncoder,
215 engine_type: EngineType,
216 pub(crate) batching_strategy: BatchingStrategy,
217 current_bached_row_num: usize,
218 created_time: SystemTime,
219 file_seq: u64,
220}
221
222enum FileWriterEnum {
234 ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
235 FileWriter(OpendalWriter),
236}
237
238impl OpenDalSinkWriter {
240 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
242 if self.sink_writer.is_none() {
243 assert_eq!(self.current_bached_row_num, 0);
244 self.create_sink_writer().await?;
245 };
246 self.append_only(chunk).await?;
247 Ok(())
248 }
249
250 pub async fn commit(&mut self) -> Result<bool> {
252 if let Some(sink_writer) = self.sink_writer.take() {
253 match sink_writer {
254 FileWriterEnum::ParquetFileWriter(w) => {
255 let bytes_written = w.bytes_written();
256 if bytes_written > 0 {
257 w.close().await?;
258 tracing::debug!(
259 "writer {} (executor_id: {}, created_time: {}) finish write file, bytes_written: {}",
260 self.unique_writer_id,
261 self.executor_id,
262 self.created_time
263 .duration_since(UNIX_EPOCH)
264 .expect("Time went backwards")
265 .as_secs(),
266 bytes_written
267 );
268 }
269 }
270 FileWriterEnum::FileWriter(mut w) => {
271 w.close().await?;
272 }
273 };
274 self.current_bached_row_num = 0;
275 return Ok(true);
276 }
277 Ok(false)
278 }
279
280 pub fn has_pending_data(&self) -> bool {
282 self.sink_writer.is_some()
283 }
284
285 pub async fn try_commit(&mut self) -> Result<bool> {
287 if self.can_commit() {
288 return self.commit().await;
289 }
290 Ok(false)
291 }
292}
293
294impl OpenDalSinkWriter {
296 fn can_commit(&self) -> bool {
298 self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
299 || self.current_bached_row_num >= self.batching_strategy.max_row_count
300 }
301
302 fn path_partition_prefix(&self, duration: &Duration) -> String {
303 let datetime = Utc
304 .timestamp_opt(duration.as_secs() as i64, 0)
305 .single()
306 .expect("Failed to convert timestamp to DateTime<Utc>")
307 .with_timezone(&Utc);
308 let path_partition_prefix = self
309 .batching_strategy
310 .path_partition_prefix
311 .as_ref()
312 .unwrap_or(&PathPartitionPrefix::None);
313 match path_partition_prefix {
314 PathPartitionPrefix::None => "".to_owned(),
315 PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
316 PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
317 PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
318 }
319 }
320
321 fn duration_seconds_since_writer_created(&self) -> usize {
322 let now = SystemTime::now();
323 now.duration_since(self.created_time)
324 .expect("Time went backwards")
325 .as_secs() as usize
326 }
327
328 async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
330 match self
331 .sink_writer
332 .as_mut()
333 .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
334 {
335 FileWriterEnum::ParquetFileWriter(w) => {
336 let batch =
337 IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
338 let batch_row_nums = batch.num_rows();
339 w.write(&batch).await?;
340 self.current_bached_row_num += batch_row_nums;
341 }
342 FileWriterEnum::FileWriter(w) => {
343 let mut chunk_buf = BytesMut::new();
344 let batch_row_nums = chunk.data_chunk().capacity();
345 for (op, row) in chunk.rows() {
347 assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
348 writeln!(
351 chunk_buf,
352 "{}",
353 Value::Object(self.row_encoder.encode(row)?)
354 )
355 .unwrap(); }
357 w.write(chunk_buf.freeze()).await?;
358 self.current_bached_row_num += batch_row_nums;
359 }
360 }
361 Ok(())
362 }
363}
364
365impl OpenDalSinkWriter {
367 pub fn new(
368 operator: Operator,
369 write_path: &str,
370 rw_schema: Schema,
371 executor_id: ExecutorId,
372 format_desc: &SinkFormatDesc,
373 engine_type: EngineType,
374 batching_strategy: BatchingStrategy,
375 ) -> Result<Self> {
376 let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
377 let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
378 let row_encoder = JsonEncoder::new(
379 rw_schema,
380 None,
381 crate::sink::encoder::DateHandlingMode::String,
382 TimestampHandlingMode::String,
383 TimestamptzHandlingMode::UtcString,
384 TimeHandlingMode::String,
385 jsonb_handling_mode,
386 );
387 Ok(Self {
388 schema: Arc::new(arrow_schema),
389 write_path: write_path.to_owned(),
390 operator,
391 sink_writer: None,
392 executor_id,
393 unique_writer_id: Uuid::now_v7(),
394 encode_type: format_desc.encode.clone(),
395 row_encoder,
396 engine_type,
397 batching_strategy,
398 current_bached_row_num: 0,
399 created_time: SystemTime::now(),
400 file_seq: 0,
401 })
402 }
403
404 async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
405 let suffix = match self.encode_type {
407 SinkEncode::Parquet => "parquet",
408 SinkEncode::Json => "json",
409 _ => unimplemented!(),
410 };
411
412 let create_time = self
413 .created_time
414 .duration_since(UNIX_EPOCH)
415 .expect("Time went backwards");
416
417 let object_name = {
425 let base_path = match self.engine_type {
426 EngineType::Fs => "".to_owned(),
427 EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
428 _ => format!("{}/", self.write_path),
429 };
430 let current_file_seq = self.file_seq;
431 self.file_seq = self.file_seq.checked_add(1).expect("file seq overflow");
432
433 format!(
434 "{}{}{}_{}_{}.{}",
435 base_path,
436 self.path_partition_prefix(&create_time),
437 self.unique_writer_id,
438 create_time.as_secs(),
439 current_file_seq,
440 suffix,
441 )
442 };
443 Ok(self
444 .operator
445 .writer_with(&object_name)
446 .concurrent(8)
447 .await?)
448 }
449
450 async fn create_sink_writer(&mut self) -> Result<()> {
451 self.created_time = SystemTime::now();
453
454 let object_writer = self.create_object_writer().await?;
455 match self.encode_type {
456 SinkEncode::Parquet => {
457 let props = WriterProperties::builder().set_compression(Compression::SNAPPY);
458 let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
459 object_writer.into_futures_async_write().compat_write();
460 self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
461 AsyncArrowWriter::try_new(
462 parquet_writer,
463 self.schema.clone(),
464 Some(props.build()),
465 )?,
466 ));
467 }
468 _ => {
469 self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
470 }
471 }
472 self.current_bached_row_num = 0;
473
474 Ok(())
475 }
476}
477
478fn convert_rw_schema_to_arrow_schema(
479 rw_schema: risingwave_common::catalog::Schema,
480) -> anyhow::Result<arrow_schema_iceberg::Schema> {
481 let mut schema_fields = HashMap::new();
482 rw_schema.fields.iter().for_each(|field| {
483 let res = schema_fields.insert(&field.name, &field.data_type);
484 assert!(res.is_none())
486 });
487 let mut arrow_fields = vec![];
488 for rw_field in &rw_schema.fields {
489 let arrow_field = IcebergArrowConvert
490 .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
491
492 arrow_fields.push(arrow_field);
493 }
494
495 Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
496}
497
498#[serde_as]
510#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
511pub struct BatchingStrategy {
512 #[serde(default = "default_max_row_count")]
513 #[serde_as(as = "DisplayFromStr")]
514 pub max_row_count: usize,
515 #[serde(default = "default_rollover_seconds")]
516 #[serde_as(as = "DisplayFromStr")]
517 pub rollover_seconds: usize,
518 #[serde(default)]
519 #[serde_as(as = "Option<DisplayFromStr>")]
520 pub path_partition_prefix: Option<PathPartitionPrefix>,
521}
522
523#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
531#[strum(serialize_all = "snake_case")]
532pub enum PathPartitionPrefix {
533 #[default]
534 None = 0,
535 #[serde(alias = "day")]
536 Day = 1,
537 #[serde(alias = "month")]
538 Month = 2,
539 #[serde(alias = "hour")]
540 Hour = 3,
541}