risingwave_connector/sink/file_sink/
opendal_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::file::properties::WriterProperties;
27use risingwave_common::array::arrow::IcebergArrowConvert;
28use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
29use risingwave_common::array::{Op, StreamChunk};
30use risingwave_common::catalog::Schema;
31use serde::Deserialize;
32use serde_json::Value;
33use serde_with::{DisplayFromStr, serde_as};
34use strum_macros::{Display, EnumString};
35use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
36use with_options::WithOptions;
37
38use crate::sink::catalog::SinkEncode;
39use crate::sink::encoder::{
40    JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
41    TimestamptzHandlingMode,
42};
43use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
44use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam};
45use crate::source::TryFromBTreeMap;
46use crate::with_options::WithOptions;
47
48pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
49pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
50
51pub fn default_rollover_seconds() -> usize {
52    DEFAULT_ROLLOVER_SECONDS
53}
54pub fn default_max_row_count() -> usize {
55    DEFAULT_MAX_ROW_COUNR
56}
57/// The `FileSink` struct represents a file sink that uses the `OpendalSinkBackend` trait for its backend implementation.
58///
59/// # Type Parameters
60///
61/// - S: The type parameter S represents the concrete implementation of the `OpendalSinkBackend` trait used by this file sink.
62#[derive(Debug, Clone)]
63pub struct FileSink<S: OpendalSinkBackend> {
64    pub(crate) op: Operator,
65    /// The path to the file where the sink writes data.
66    pub(crate) path: String,
67    /// The schema describing the structure of the data being written to the file sink.
68    pub(crate) schema: Schema,
69    pub(crate) is_append_only: bool,
70    /// The batching strategy for sinking data to files.
71    pub(crate) batching_strategy: BatchingStrategy,
72
73    /// The description of the sink's format.
74    pub(crate) format_desc: SinkFormatDesc,
75    pub(crate) engine_type: EngineType,
76    pub(crate) _marker: PhantomData<S>,
77}
78
79/// The `OpendalSinkBackend` trait unifies the behavior of various sink backends
80/// implemented through `OpenDAL`(`<https://github.com/apache/opendal>`).
81///
82/// # Type Parameters
83///
84/// - Properties: Represents the necessary parameters for establishing a backend.
85///
86/// # Constants
87///
88/// - `SINK_NAME`: A static string representing the name of the sink.
89///
90/// # Functions
91///
92/// - `from_btreemap`: Automatically parse the required parameters from the input create sink statement.
93/// - `new_operator`: Creates a new operator using the provided backend properties.
94/// - `get_path`: Returns the path of the sink file specified by the user's create sink statement.
95pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
96    type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
97    const SINK_NAME: &'static str;
98
99    fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
100    fn new_operator(properties: Self::Properties) -> Result<Operator>;
101    fn get_path(properties: Self::Properties) -> String;
102    fn get_engine_type() -> EngineType;
103    fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
104}
105
106#[derive(Clone, Debug)]
107pub enum EngineType {
108    Gcs,
109    S3,
110    Fs,
111    Azblob,
112    Webhdfs,
113    Snowflake,
114}
115
116impl<S: OpendalSinkBackend> Sink for FileSink<S> {
117    type Coordinator = DummySinkCommitCoordinator;
118    type LogSinker = BatchingLogSinker;
119
120    const SINK_NAME: &'static str = S::SINK_NAME;
121
122    async fn validate(&self) -> Result<()> {
123        if !self.is_append_only {
124            return Err(SinkError::Config(anyhow!(
125                "File sink only supports append-only mode at present. \
126                    Please change the query to append-only, and specify it \
127                    explicitly after the `FORMAT ... ENCODE ...` statement. \
128                    For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
129            )));
130        }
131
132        if self.format_desc.encode != SinkEncode::Parquet
133            && self.format_desc.encode != SinkEncode::Json
134        {
135            return Err(SinkError::Config(anyhow!(
136                "File sink only supports `PARQUET` and `JSON` encode at present."
137            )));
138        }
139
140        match self.op.list(&self.path).await {
141            Ok(_) => Ok(()),
142            Err(e) => Err(anyhow!(e).into()),
143        }
144    }
145
146    async fn new_log_sinker(
147        &self,
148        writer_param: crate::sink::SinkWriterParam,
149    ) -> Result<Self::LogSinker> {
150        let writer = OpenDalSinkWriter::new(
151            self.op.clone(),
152            &self.path,
153            self.schema.clone(),
154            writer_param.executor_id,
155            &self.format_desc,
156            self.engine_type.clone(),
157            self.batching_strategy.clone(),
158        )?;
159        Ok(BatchingLogSinker::new(writer))
160    }
161}
162
163impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
164    type Error = SinkError;
165
166    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
167        let schema = param.schema();
168        let config = S::from_btreemap(param.properties)?;
169        let path = S::get_path(config.clone()).clone();
170        let op = S::new_operator(config.clone())?;
171        let batching_strategy = S::get_batching_strategy(config.clone());
172        let engine_type = S::get_engine_type();
173        let format_desc = match param.format_desc {
174            Some(desc) => desc,
175            None => {
176                if let EngineType::Snowflake = engine_type {
177                    SinkFormatDesc::plain_json_for_snowflake_only()
178                } else {
179                    return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
180                }
181            }
182        };
183        Ok(Self {
184            op,
185            path,
186            schema,
187            is_append_only: param.sink_type.is_append_only(),
188            batching_strategy,
189            format_desc,
190            engine_type,
191            _marker: PhantomData,
192        })
193    }
194}
195
196pub struct OpenDalSinkWriter {
197    schema: SchemaRef,
198    operator: Operator,
199    sink_writer: Option<FileWriterEnum>,
200    write_path: String,
201    executor_id: u64,
202    encode_type: SinkEncode,
203    row_encoder: JsonEncoder,
204    engine_type: EngineType,
205    pub(crate) batching_strategy: BatchingStrategy,
206    current_bached_row_num: usize,
207    created_time: SystemTime,
208}
209
210/// The `FileWriterEnum` enum represents different types of file writers used for various sink
211/// implementations.
212///
213/// # Variants
214///
215/// - `ParquetFileWriter`: Represents a Parquet file writer using the `AsyncArrowWriter<W>`
216///   for writing data to a Parquet file. It accepts an implementation of W: `AsyncWrite` + `Unpin` + `Send`
217///   as the underlying writer. In this case, the `OpendalWriter` serves as the underlying writer.
218/// - `FileWriter`: Represents a basic `OpenDAL` writer, for writing files in encodes other than parquet.
219///
220/// The choice of writer used during the actual writing process depends on the encode type of the sink.
221enum FileWriterEnum {
222    ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
223    FileWriter(OpendalWriter),
224}
225
226/// Public interface exposed to `BatchingLogSinker`, used to write chunk and commit files.
227impl OpenDalSinkWriter {
228    /// This method writes a chunk.
229    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
230        if self.sink_writer.is_none() {
231            assert_eq!(self.current_bached_row_num, 0);
232            self.create_sink_writer().await?;
233        };
234        self.append_only(chunk).await?;
235        Ok(())
236    }
237
238    /// This method close current writer, finish writing a file and returns whether the commit is successful.
239    pub async fn commit(&mut self) -> Result<bool> {
240        if let Some(sink_writer) = self.sink_writer.take() {
241            match sink_writer {
242                FileWriterEnum::ParquetFileWriter(w) => {
243                    if w.bytes_written() > 0 {
244                        let metadata = w.close().await?;
245                        tracing::info!(
246                            "writer {:?}_{:?}finish write file, metadata: {:?}",
247                            self.executor_id,
248                            self.created_time
249                                .duration_since(UNIX_EPOCH)
250                                .expect("Time went backwards")
251                                .as_secs(),
252                            metadata
253                        );
254                    }
255                }
256                FileWriterEnum::FileWriter(mut w) => {
257                    w.close().await?;
258                }
259            };
260            self.current_bached_row_num = 0;
261            return Ok(true);
262        }
263        Ok(false)
264    }
265
266    // Try commit if the batching condition is met.
267    pub async fn try_commit(&mut self) -> Result<bool> {
268        if self.can_commit() {
269            return self.commit().await;
270        }
271        Ok(false)
272    }
273}
274
275/// Private methods related to batching.
276impl OpenDalSinkWriter {
277    /// Method for judging whether batch condiction is met.
278    fn can_commit(&self) -> bool {
279        self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
280            || self.current_bached_row_num >= self.batching_strategy.max_row_count
281    }
282
283    fn path_partition_prefix(&self, duration: &Duration) -> String {
284        let datetime = Utc
285            .timestamp_opt(duration.as_secs() as i64, 0)
286            .single()
287            .expect("Failed to convert timestamp to DateTime<Utc>")
288            .with_timezone(&Utc);
289        let path_partition_prefix = self
290            .batching_strategy
291            .path_partition_prefix
292            .as_ref()
293            .unwrap_or(&PathPartitionPrefix::None);
294        match path_partition_prefix {
295            PathPartitionPrefix::None => "".to_owned(),
296            PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
297            PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
298            PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
299        }
300    }
301
302    fn duration_seconds_since_writer_created(&self) -> usize {
303        let now = SystemTime::now();
304        now.duration_since(self.created_time)
305            .expect("Time went backwards")
306            .as_secs() as usize
307    }
308
309    // Method for writing chunk and update related batching condition.
310    async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
311        match self
312            .sink_writer
313            .as_mut()
314            .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
315        {
316            FileWriterEnum::ParquetFileWriter(w) => {
317                let batch =
318                    IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
319                let batch_row_nums = batch.num_rows();
320                w.write(&batch).await?;
321                self.current_bached_row_num += batch_row_nums;
322            }
323            FileWriterEnum::FileWriter(w) => {
324                let mut chunk_buf = BytesMut::new();
325                let batch_row_nums = chunk.data_chunk().capacity();
326                // write the json representations of the row(s) in current chunk to `chunk_buf`
327                for (op, row) in chunk.rows() {
328                    assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
329                    // to prevent temporary string allocation,
330                    // so we directly write to `chunk_buf` implicitly via `write_fmt`.
331                    writeln!(
332                        chunk_buf,
333                        "{}",
334                        Value::Object(self.row_encoder.encode(row)?)
335                    )
336                    .unwrap(); // write to a `BytesMut` should never fail
337                }
338                w.write(chunk_buf.freeze()).await?;
339                self.current_bached_row_num += batch_row_nums;
340            }
341        }
342        Ok(())
343    }
344}
345
346/// Init methods.
347impl OpenDalSinkWriter {
348    pub fn new(
349        operator: Operator,
350        write_path: &str,
351        rw_schema: Schema,
352        executor_id: u64,
353        format_desc: &SinkFormatDesc,
354        engine_type: EngineType,
355        batching_strategy: BatchingStrategy,
356    ) -> Result<Self> {
357        let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
358        let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
359        let row_encoder = JsonEncoder::new(
360            rw_schema,
361            None,
362            crate::sink::encoder::DateHandlingMode::String,
363            TimestampHandlingMode::String,
364            TimestamptzHandlingMode::UtcString,
365            TimeHandlingMode::String,
366            jsonb_handling_mode,
367        );
368        Ok(Self {
369            schema: Arc::new(arrow_schema),
370            write_path: write_path.to_owned(),
371            operator,
372            sink_writer: None,
373            executor_id,
374            encode_type: format_desc.encode.clone(),
375            row_encoder,
376            engine_type,
377            batching_strategy,
378            current_bached_row_num: 0,
379            created_time: SystemTime::now(),
380        })
381    }
382
383    async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
384        // Todo: specify more file suffixes based on encode_type.
385        let suffix = match self.encode_type {
386            SinkEncode::Parquet => "parquet",
387            SinkEncode::Json => "json",
388            _ => unimplemented!(),
389        };
390
391        let create_time = self
392            .created_time
393            .duration_since(UNIX_EPOCH)
394            .expect("Time went backwards");
395
396        // With batching in place, the file writing process is decoupled from checkpoints.
397        // The current file naming convention is as follows:
398        // 1. A subdirectory is defined based on `path_partition_prefix` (e.g., by day态hour or month or none.).
399        // 2. The file name includes the `executor_id` and the creation time in seconds since the UNIX epoch.
400        // If the engine type is `Fs`, the path is automatically handled, and the filename does not include a path prefix.
401        // 3. For the Snowflake Sink, the `write_path` parameter can be empty.
402        // When the `write_path` is not specified, the data will be written to the root of the specified bucket.
403        let object_name = {
404            let base_path = match self.engine_type {
405                EngineType::Fs => "".to_owned(),
406                EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
407                _ => format!("{}/", self.write_path),
408            };
409
410            format!(
411                "{}{}{}_{}.{}",
412                base_path,
413                self.path_partition_prefix(&create_time),
414                self.executor_id,
415                create_time.as_secs(),
416                suffix,
417            )
418        };
419        Ok(self
420            .operator
421            .writer_with(&object_name)
422            .concurrent(8)
423            .await?)
424    }
425
426    async fn create_sink_writer(&mut self) -> Result<()> {
427        let object_writer = self.create_object_writer().await?;
428        match self.encode_type {
429            SinkEncode::Parquet => {
430                let props = WriterProperties::builder();
431                let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
432                    object_writer.into_futures_async_write().compat_write();
433                self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
434                    AsyncArrowWriter::try_new(
435                        parquet_writer,
436                        self.schema.clone(),
437                        Some(props.build()),
438                    )?,
439                ));
440            }
441            _ => {
442                self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
443            }
444        }
445        self.current_bached_row_num = 0;
446
447        self.created_time = SystemTime::now();
448
449        Ok(())
450    }
451}
452
453fn convert_rw_schema_to_arrow_schema(
454    rw_schema: risingwave_common::catalog::Schema,
455) -> anyhow::Result<arrow_schema_iceberg::Schema> {
456    let mut schema_fields = HashMap::new();
457    rw_schema.fields.iter().for_each(|field| {
458        let res = schema_fields.insert(&field.name, &field.data_type);
459        // This assert is to make sure there is no duplicate field name in the schema.
460        assert!(res.is_none())
461    });
462    let mut arrow_fields = vec![];
463    for rw_field in &rw_schema.fields {
464        let arrow_field = IcebergArrowConvert
465            .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
466
467        arrow_fields.push(arrow_field);
468    }
469
470    Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
471}
472
473/// `BatchingStrategy` represents the strategy for batching data before writing to files.
474///
475/// This struct contains settings that control how data is collected and
476/// partitioned based on specified criteria:
477///
478/// - `max_row_count`: Optional maximum number of rows to accumulate before writing.
479/// - `rollover_seconds`: Optional time interval (in seconds) to trigger a write,
480///   regardless of the number of accumulated rows.
481/// - `path_partition_prefix`: Specifies how files are organized into directories
482///   based on creation time (e.g., by day, month, or hour).
483
484#[serde_as]
485#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
486pub struct BatchingStrategy {
487    #[serde(default = "default_max_row_count")]
488    #[serde_as(as = "DisplayFromStr")]
489    pub max_row_count: usize,
490    #[serde(default = "default_rollover_seconds")]
491    #[serde_as(as = "DisplayFromStr")]
492    pub rollover_seconds: usize,
493    #[serde(default)]
494    #[serde_as(as = "Option<DisplayFromStr>")]
495    pub path_partition_prefix: Option<PathPartitionPrefix>,
496}
497
498/// `PathPartitionPrefix` defines the granularity of file partitions based on creation time.
499///
500/// Each variant specifies how files are organized into directories:
501/// - `None`: No partitioning.
502/// - `Day`: Files are written in a directory for each day.
503/// - `Month`: Files are written in a directory for each month.
504/// - `Hour`: Files are written in a directory for each hour.
505#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
506#[strum(serialize_all = "snake_case")]
507pub enum PathPartitionPrefix {
508    #[default]
509    None = 0,
510    #[serde(alias = "day")]
511    Day = 1,
512    #[serde(alias = "month")]
513    Month = 2,
514    #[serde(alias = "hour")]
515    Hour = 3,
516}