risingwave_connector/sink/file_sink/
opendal_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::basic::Compression;
27use parquet::file::properties::WriterProperties;
28use risingwave_common::array::arrow::IcebergArrowConvert;
29use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
30use risingwave_common::array::{Op, StreamChunk};
31use risingwave_common::catalog::Schema;
32use serde::Deserialize;
33use serde_json::Value;
34use serde_with::{DisplayFromStr, serde_as};
35use strum_macros::{Display, EnumString};
36use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
37use with_options::WithOptions;
38
39use crate::enforce_secret::EnforceSecret;
40use crate::sink::catalog::SinkEncode;
41use crate::sink::encoder::{
42    JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
43    TimestamptzHandlingMode,
44};
45use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
46use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam};
47use crate::source::TryFromBTreeMap;
48use crate::with_options::WithOptions;
49
50pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
51pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
52
53pub fn default_rollover_seconds() -> usize {
54    DEFAULT_ROLLOVER_SECONDS
55}
56pub fn default_max_row_count() -> usize {
57    DEFAULT_MAX_ROW_COUNR
58}
59/// The `FileSink` struct represents a file sink that uses the `OpendalSinkBackend` trait for its backend implementation.
60///
61/// # Type Parameters
62///
63/// - S: The type parameter S represents the concrete implementation of the `OpendalSinkBackend` trait used by this file sink.
64#[derive(Debug, Clone)]
65pub struct FileSink<S: OpendalSinkBackend> {
66    pub(crate) op: Operator,
67    /// The path to the file where the sink writes data.
68    pub(crate) path: String,
69    /// The schema describing the structure of the data being written to the file sink.
70    pub(crate) schema: Schema,
71    pub(crate) is_append_only: bool,
72    /// The batching strategy for sinking data to files.
73    pub(crate) batching_strategy: BatchingStrategy,
74
75    /// The description of the sink's format.
76    pub(crate) format_desc: SinkFormatDesc,
77    pub(crate) engine_type: EngineType,
78    pub(crate) _marker: PhantomData<S>,
79}
80
81impl<S: OpendalSinkBackend> EnforceSecret for FileSink<S> {}
82
83/// The `OpendalSinkBackend` trait unifies the behavior of various sink backends
84/// implemented through `OpenDAL`(`<https://github.com/apache/opendal>`).
85///
86/// # Type Parameters
87///
88/// - Properties: Represents the necessary parameters for establishing a backend.
89///
90/// # Constants
91///
92/// - `SINK_NAME`: A static string representing the name of the sink.
93///
94/// # Functions
95///
96/// - `from_btreemap`: Automatically parse the required parameters from the input create sink statement.
97/// - `new_operator`: Creates a new operator using the provided backend properties.
98/// - `get_path`: Returns the path of the sink file specified by the user's create sink statement.
99pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
100    type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
101    const SINK_NAME: &'static str;
102
103    fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
104    fn new_operator(properties: Self::Properties) -> Result<Operator>;
105    fn get_path(properties: Self::Properties) -> String;
106    fn get_engine_type() -> EngineType;
107    fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
108}
109
110#[derive(Clone, Debug)]
111pub enum EngineType {
112    Gcs,
113    S3,
114    Fs,
115    Azblob,
116    Webhdfs,
117    Snowflake,
118}
119
120impl<S: OpendalSinkBackend> Sink for FileSink<S> {
121    type Coordinator = DummySinkCommitCoordinator;
122    type LogSinker = BatchingLogSinker;
123
124    const SINK_NAME: &'static str = S::SINK_NAME;
125
126    async fn validate(&self) -> Result<()> {
127        if matches!(self.engine_type, EngineType::Snowflake) {
128            risingwave_common::license::Feature::SnowflakeSink
129                .check_available()
130                .map_err(|e| anyhow::anyhow!(e))?;
131        }
132        if !self.is_append_only {
133            return Err(SinkError::Config(anyhow!(
134                "File sink only supports append-only mode at present. \
135                    Please change the query to append-only, and specify it \
136                    explicitly after the `FORMAT ... ENCODE ...` statement. \
137                    For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
138            )));
139        }
140
141        if self.format_desc.encode != SinkEncode::Parquet
142            && self.format_desc.encode != SinkEncode::Json
143        {
144            return Err(SinkError::Config(anyhow!(
145                "File sink only supports `PARQUET` and `JSON` encode at present."
146            )));
147        }
148
149        match self.op.list(&self.path).await {
150            Ok(_) => Ok(()),
151            Err(e) => Err(anyhow!(e).into()),
152        }
153    }
154
155    async fn new_log_sinker(
156        &self,
157        writer_param: crate::sink::SinkWriterParam,
158    ) -> Result<Self::LogSinker> {
159        let writer = OpenDalSinkWriter::new(
160            self.op.clone(),
161            &self.path,
162            self.schema.clone(),
163            writer_param.executor_id,
164            &self.format_desc,
165            self.engine_type.clone(),
166            self.batching_strategy.clone(),
167        )?;
168        Ok(BatchingLogSinker::new(writer))
169    }
170}
171
172impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
173    type Error = SinkError;
174
175    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
176        let schema = param.schema();
177        let config = S::from_btreemap(param.properties)?;
178        let path = S::get_path(config.clone()).clone();
179        let op = S::new_operator(config.clone())?;
180        let batching_strategy = S::get_batching_strategy(config.clone());
181        let engine_type = S::get_engine_type();
182        let format_desc = match param.format_desc {
183            Some(desc) => desc,
184            None => {
185                if let EngineType::Snowflake = engine_type {
186                    SinkFormatDesc::plain_json_for_snowflake_only()
187                } else {
188                    return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
189                }
190            }
191        };
192        Ok(Self {
193            op,
194            path,
195            schema,
196            is_append_only: param.sink_type.is_append_only(),
197            batching_strategy,
198            format_desc,
199            engine_type,
200            _marker: PhantomData,
201        })
202    }
203}
204
205pub struct OpenDalSinkWriter {
206    schema: SchemaRef,
207    operator: Operator,
208    sink_writer: Option<FileWriterEnum>,
209    write_path: String,
210    executor_id: u64,
211    encode_type: SinkEncode,
212    row_encoder: JsonEncoder,
213    engine_type: EngineType,
214    pub(crate) batching_strategy: BatchingStrategy,
215    current_bached_row_num: usize,
216    created_time: SystemTime,
217}
218
219/// The `FileWriterEnum` enum represents different types of file writers used for various sink
220/// implementations.
221///
222/// # Variants
223///
224/// - `ParquetFileWriter`: Represents a Parquet file writer using the `AsyncArrowWriter<W>`
225///   for writing data to a Parquet file. It accepts an implementation of W: `AsyncWrite` + `Unpin` + `Send`
226///   as the underlying writer. In this case, the `OpendalWriter` serves as the underlying writer.
227/// - `FileWriter`: Represents a basic `OpenDAL` writer, for writing files in encodes other than parquet.
228///
229/// The choice of writer used during the actual writing process depends on the encode type of the sink.
230enum FileWriterEnum {
231    ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
232    FileWriter(OpendalWriter),
233}
234
235/// Public interface exposed to `BatchingLogSinker`, used to write chunk and commit files.
236impl OpenDalSinkWriter {
237    /// This method writes a chunk.
238    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
239        if self.sink_writer.is_none() {
240            assert_eq!(self.current_bached_row_num, 0);
241            self.create_sink_writer().await?;
242        };
243        self.append_only(chunk).await?;
244        Ok(())
245    }
246
247    /// This method close current writer, finish writing a file and returns whether the commit is successful.
248    pub async fn commit(&mut self) -> Result<bool> {
249        if let Some(sink_writer) = self.sink_writer.take() {
250            match sink_writer {
251                FileWriterEnum::ParquetFileWriter(w) => {
252                    if w.bytes_written() > 0 {
253                        let metadata = w.close().await?;
254                        tracing::info!(
255                            "writer {:?}_{:?}finish write file, metadata: {:?}",
256                            self.executor_id,
257                            self.created_time
258                                .duration_since(UNIX_EPOCH)
259                                .expect("Time went backwards")
260                                .as_secs(),
261                            metadata
262                        );
263                    }
264                }
265                FileWriterEnum::FileWriter(mut w) => {
266                    w.close().await?;
267                }
268            };
269            self.current_bached_row_num = 0;
270            return Ok(true);
271        }
272        Ok(false)
273    }
274
275    // Try commit if the batching condition is met.
276    pub async fn try_commit(&mut self) -> Result<bool> {
277        if self.can_commit() {
278            return self.commit().await;
279        }
280        Ok(false)
281    }
282}
283
284/// Private methods related to batching.
285impl OpenDalSinkWriter {
286    /// Method for judging whether batch condiction is met.
287    fn can_commit(&self) -> bool {
288        self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
289            || self.current_bached_row_num >= self.batching_strategy.max_row_count
290    }
291
292    fn path_partition_prefix(&self, duration: &Duration) -> String {
293        let datetime = Utc
294            .timestamp_opt(duration.as_secs() as i64, 0)
295            .single()
296            .expect("Failed to convert timestamp to DateTime<Utc>")
297            .with_timezone(&Utc);
298        let path_partition_prefix = self
299            .batching_strategy
300            .path_partition_prefix
301            .as_ref()
302            .unwrap_or(&PathPartitionPrefix::None);
303        match path_partition_prefix {
304            PathPartitionPrefix::None => "".to_owned(),
305            PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
306            PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
307            PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
308        }
309    }
310
311    fn duration_seconds_since_writer_created(&self) -> usize {
312        let now = SystemTime::now();
313        now.duration_since(self.created_time)
314            .expect("Time went backwards")
315            .as_secs() as usize
316    }
317
318    // Method for writing chunk and update related batching condition.
319    async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
320        match self
321            .sink_writer
322            .as_mut()
323            .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
324        {
325            FileWriterEnum::ParquetFileWriter(w) => {
326                let batch =
327                    IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
328                let batch_row_nums = batch.num_rows();
329                w.write(&batch).await?;
330                self.current_bached_row_num += batch_row_nums;
331            }
332            FileWriterEnum::FileWriter(w) => {
333                let mut chunk_buf = BytesMut::new();
334                let batch_row_nums = chunk.data_chunk().capacity();
335                // write the json representations of the row(s) in current chunk to `chunk_buf`
336                for (op, row) in chunk.rows() {
337                    assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
338                    // to prevent temporary string allocation,
339                    // so we directly write to `chunk_buf` implicitly via `write_fmt`.
340                    writeln!(
341                        chunk_buf,
342                        "{}",
343                        Value::Object(self.row_encoder.encode(row)?)
344                    )
345                    .unwrap(); // write to a `BytesMut` should never fail
346                }
347                w.write(chunk_buf.freeze()).await?;
348                self.current_bached_row_num += batch_row_nums;
349            }
350        }
351        Ok(())
352    }
353}
354
355/// Init methods.
356impl OpenDalSinkWriter {
357    pub fn new(
358        operator: Operator,
359        write_path: &str,
360        rw_schema: Schema,
361        executor_id: u64,
362        format_desc: &SinkFormatDesc,
363        engine_type: EngineType,
364        batching_strategy: BatchingStrategy,
365    ) -> Result<Self> {
366        let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
367        let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
368        let row_encoder = JsonEncoder::new(
369            rw_schema,
370            None,
371            crate::sink::encoder::DateHandlingMode::String,
372            TimestampHandlingMode::String,
373            TimestamptzHandlingMode::UtcString,
374            TimeHandlingMode::String,
375            jsonb_handling_mode,
376        );
377        Ok(Self {
378            schema: Arc::new(arrow_schema),
379            write_path: write_path.to_owned(),
380            operator,
381            sink_writer: None,
382            executor_id,
383            encode_type: format_desc.encode.clone(),
384            row_encoder,
385            engine_type,
386            batching_strategy,
387            current_bached_row_num: 0,
388            created_time: SystemTime::now(),
389        })
390    }
391
392    async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
393        // Todo: specify more file suffixes based on encode_type.
394        let suffix = match self.encode_type {
395            SinkEncode::Parquet => "parquet",
396            SinkEncode::Json => "json",
397            _ => unimplemented!(),
398        };
399
400        let create_time = self
401            .created_time
402            .duration_since(UNIX_EPOCH)
403            .expect("Time went backwards");
404
405        // With batching in place, the file writing process is decoupled from checkpoints.
406        // The current file naming convention is as follows:
407        // 1. A subdirectory is defined based on `path_partition_prefix` (e.g., by day态hour or month or none.).
408        // 2. The file name includes the `executor_id` and the creation time in seconds since the UNIX epoch.
409        // If the engine type is `Fs`, the path is automatically handled, and the filename does not include a path prefix.
410        // 3. For the Snowflake Sink, the `write_path` parameter can be empty.
411        // When the `write_path` is not specified, the data will be written to the root of the specified bucket.
412        let object_name = {
413            let base_path = match self.engine_type {
414                EngineType::Fs => "".to_owned(),
415                EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
416                _ => format!("{}/", self.write_path),
417            };
418
419            format!(
420                "{}{}{}_{}.{}",
421                base_path,
422                self.path_partition_prefix(&create_time),
423                self.executor_id,
424                create_time.as_secs(),
425                suffix,
426            )
427        };
428        Ok(self
429            .operator
430            .writer_with(&object_name)
431            .concurrent(8)
432            .await?)
433    }
434
435    async fn create_sink_writer(&mut self) -> Result<()> {
436        let object_writer = self.create_object_writer().await?;
437        match self.encode_type {
438            SinkEncode::Parquet => {
439                let props = WriterProperties::builder().set_compression(Compression::SNAPPY);
440                let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
441                    object_writer.into_futures_async_write().compat_write();
442                self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
443                    AsyncArrowWriter::try_new(
444                        parquet_writer,
445                        self.schema.clone(),
446                        Some(props.build()),
447                    )?,
448                ));
449            }
450            _ => {
451                self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
452            }
453        }
454        self.current_bached_row_num = 0;
455
456        self.created_time = SystemTime::now();
457
458        Ok(())
459    }
460}
461
462fn convert_rw_schema_to_arrow_schema(
463    rw_schema: risingwave_common::catalog::Schema,
464) -> anyhow::Result<arrow_schema_iceberg::Schema> {
465    let mut schema_fields = HashMap::new();
466    rw_schema.fields.iter().for_each(|field| {
467        let res = schema_fields.insert(&field.name, &field.data_type);
468        // This assert is to make sure there is no duplicate field name in the schema.
469        assert!(res.is_none())
470    });
471    let mut arrow_fields = vec![];
472    for rw_field in &rw_schema.fields {
473        let arrow_field = IcebergArrowConvert
474            .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
475
476        arrow_fields.push(arrow_field);
477    }
478
479    Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
480}
481
482/// `BatchingStrategy` represents the strategy for batching data before writing to files.
483///
484/// This struct contains settings that control how data is collected and
485/// partitioned based on specified criteria:
486///
487/// - `max_row_count`: Optional maximum number of rows to accumulate before writing.
488/// - `rollover_seconds`: Optional time interval (in seconds) to trigger a write,
489///   regardless of the number of accumulated rows.
490/// - `path_partition_prefix`: Specifies how files are organized into directories
491///   based on creation time (e.g., by day, month, or hour).
492
493#[serde_as]
494#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
495pub struct BatchingStrategy {
496    #[serde(default = "default_max_row_count")]
497    #[serde_as(as = "DisplayFromStr")]
498    pub max_row_count: usize,
499    #[serde(default = "default_rollover_seconds")]
500    #[serde_as(as = "DisplayFromStr")]
501    pub rollover_seconds: usize,
502    #[serde(default)]
503    #[serde_as(as = "Option<DisplayFromStr>")]
504    pub path_partition_prefix: Option<PathPartitionPrefix>,
505}
506
507/// `PathPartitionPrefix` defines the granularity of file partitions based on creation time.
508///
509/// Each variant specifies how files are organized into directories:
510/// - `None`: No partitioning.
511/// - `Day`: Files are written in a directory for each day.
512/// - `Month`: Files are written in a directory for each month.
513/// - `Hour`: Files are written in a directory for each hour.
514#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
515#[strum(serialize_all = "snake_case")]
516pub enum PathPartitionPrefix {
517    #[default]
518    None = 0,
519    #[serde(alias = "day")]
520    Day = 1,
521    #[serde(alias = "month")]
522    Month = 2,
523    #[serde(alias = "hour")]
524    Hour = 3,
525}