risingwave_connector/sink/file_sink/
opendal_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::file::properties::WriterProperties;
27use risingwave_common::array::arrow::IcebergArrowConvert;
28use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
29use risingwave_common::array::{Op, StreamChunk};
30use risingwave_common::catalog::Schema;
31use serde::Deserialize;
32use serde_json::Value;
33use serde_with::{DisplayFromStr, serde_as};
34use strum_macros::{Display, EnumString};
35use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
36use with_options::WithOptions;
37
38use crate::enforce_secret::EnforceSecret;
39use crate::sink::catalog::SinkEncode;
40use crate::sink::encoder::{
41    JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
42    TimestamptzHandlingMode,
43};
44use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
45use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam};
46use crate::source::TryFromBTreeMap;
47use crate::with_options::WithOptions;
48
49pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
50pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
51
52pub fn default_rollover_seconds() -> usize {
53    DEFAULT_ROLLOVER_SECONDS
54}
55pub fn default_max_row_count() -> usize {
56    DEFAULT_MAX_ROW_COUNR
57}
58/// The `FileSink` struct represents a file sink that uses the `OpendalSinkBackend` trait for its backend implementation.
59///
60/// # Type Parameters
61///
62/// - S: The type parameter S represents the concrete implementation of the `OpendalSinkBackend` trait used by this file sink.
63#[derive(Debug, Clone)]
64pub struct FileSink<S: OpendalSinkBackend> {
65    pub(crate) op: Operator,
66    /// The path to the file where the sink writes data.
67    pub(crate) path: String,
68    /// The schema describing the structure of the data being written to the file sink.
69    pub(crate) schema: Schema,
70    pub(crate) is_append_only: bool,
71    /// The batching strategy for sinking data to files.
72    pub(crate) batching_strategy: BatchingStrategy,
73
74    /// The description of the sink's format.
75    pub(crate) format_desc: SinkFormatDesc,
76    pub(crate) engine_type: EngineType,
77    pub(crate) _marker: PhantomData<S>,
78}
79
80impl<S: OpendalSinkBackend> EnforceSecret for FileSink<S> {}
81
82/// The `OpendalSinkBackend` trait unifies the behavior of various sink backends
83/// implemented through `OpenDAL`(`<https://github.com/apache/opendal>`).
84///
85/// # Type Parameters
86///
87/// - Properties: Represents the necessary parameters for establishing a backend.
88///
89/// # Constants
90///
91/// - `SINK_NAME`: A static string representing the name of the sink.
92///
93/// # Functions
94///
95/// - `from_btreemap`: Automatically parse the required parameters from the input create sink statement.
96/// - `new_operator`: Creates a new operator using the provided backend properties.
97/// - `get_path`: Returns the path of the sink file specified by the user's create sink statement.
98pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
99    type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
100    const SINK_NAME: &'static str;
101
102    fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
103    fn new_operator(properties: Self::Properties) -> Result<Operator>;
104    fn get_path(properties: Self::Properties) -> String;
105    fn get_engine_type() -> EngineType;
106    fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
107}
108
109#[derive(Clone, Debug)]
110pub enum EngineType {
111    Gcs,
112    S3,
113    Fs,
114    Azblob,
115    Webhdfs,
116    Snowflake,
117}
118
119impl<S: OpendalSinkBackend> Sink for FileSink<S> {
120    type Coordinator = DummySinkCommitCoordinator;
121    type LogSinker = BatchingLogSinker;
122
123    const SINK_NAME: &'static str = S::SINK_NAME;
124
125    async fn validate(&self) -> Result<()> {
126        if matches!(self.engine_type, EngineType::Snowflake) {
127            risingwave_common::license::Feature::SnowflakeSink
128                .check_available()
129                .map_err(|e| anyhow::anyhow!(e))?;
130        }
131        if !self.is_append_only {
132            return Err(SinkError::Config(anyhow!(
133                "File sink only supports append-only mode at present. \
134                    Please change the query to append-only, and specify it \
135                    explicitly after the `FORMAT ... ENCODE ...` statement. \
136                    For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
137            )));
138        }
139
140        if self.format_desc.encode != SinkEncode::Parquet
141            && self.format_desc.encode != SinkEncode::Json
142        {
143            return Err(SinkError::Config(anyhow!(
144                "File sink only supports `PARQUET` and `JSON` encode at present."
145            )));
146        }
147
148        match self.op.list(&self.path).await {
149            Ok(_) => Ok(()),
150            Err(e) => Err(anyhow!(e).into()),
151        }
152    }
153
154    async fn new_log_sinker(
155        &self,
156        writer_param: crate::sink::SinkWriterParam,
157    ) -> Result<Self::LogSinker> {
158        let writer = OpenDalSinkWriter::new(
159            self.op.clone(),
160            &self.path,
161            self.schema.clone(),
162            writer_param.executor_id,
163            &self.format_desc,
164            self.engine_type.clone(),
165            self.batching_strategy.clone(),
166        )?;
167        Ok(BatchingLogSinker::new(writer))
168    }
169}
170
171impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
172    type Error = SinkError;
173
174    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
175        let schema = param.schema();
176        let config = S::from_btreemap(param.properties)?;
177        let path = S::get_path(config.clone()).clone();
178        let op = S::new_operator(config.clone())?;
179        let batching_strategy = S::get_batching_strategy(config.clone());
180        let engine_type = S::get_engine_type();
181        let format_desc = match param.format_desc {
182            Some(desc) => desc,
183            None => {
184                if let EngineType::Snowflake = engine_type {
185                    SinkFormatDesc::plain_json_for_snowflake_only()
186                } else {
187                    return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
188                }
189            }
190        };
191        Ok(Self {
192            op,
193            path,
194            schema,
195            is_append_only: param.sink_type.is_append_only(),
196            batching_strategy,
197            format_desc,
198            engine_type,
199            _marker: PhantomData,
200        })
201    }
202}
203
204pub struct OpenDalSinkWriter {
205    schema: SchemaRef,
206    operator: Operator,
207    sink_writer: Option<FileWriterEnum>,
208    write_path: String,
209    executor_id: u64,
210    encode_type: SinkEncode,
211    row_encoder: JsonEncoder,
212    engine_type: EngineType,
213    pub(crate) batching_strategy: BatchingStrategy,
214    current_bached_row_num: usize,
215    created_time: SystemTime,
216}
217
218/// The `FileWriterEnum` enum represents different types of file writers used for various sink
219/// implementations.
220///
221/// # Variants
222///
223/// - `ParquetFileWriter`: Represents a Parquet file writer using the `AsyncArrowWriter<W>`
224///   for writing data to a Parquet file. It accepts an implementation of W: `AsyncWrite` + `Unpin` + `Send`
225///   as the underlying writer. In this case, the `OpendalWriter` serves as the underlying writer.
226/// - `FileWriter`: Represents a basic `OpenDAL` writer, for writing files in encodes other than parquet.
227///
228/// The choice of writer used during the actual writing process depends on the encode type of the sink.
229enum FileWriterEnum {
230    ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
231    FileWriter(OpendalWriter),
232}
233
234/// Public interface exposed to `BatchingLogSinker`, used to write chunk and commit files.
235impl OpenDalSinkWriter {
236    /// This method writes a chunk.
237    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
238        if self.sink_writer.is_none() {
239            assert_eq!(self.current_bached_row_num, 0);
240            self.create_sink_writer().await?;
241        };
242        self.append_only(chunk).await?;
243        Ok(())
244    }
245
246    /// This method close current writer, finish writing a file and returns whether the commit is successful.
247    pub async fn commit(&mut self) -> Result<bool> {
248        if let Some(sink_writer) = self.sink_writer.take() {
249            match sink_writer {
250                FileWriterEnum::ParquetFileWriter(w) => {
251                    if w.bytes_written() > 0 {
252                        let metadata = w.close().await?;
253                        tracing::info!(
254                            "writer {:?}_{:?}finish write file, metadata: {:?}",
255                            self.executor_id,
256                            self.created_time
257                                .duration_since(UNIX_EPOCH)
258                                .expect("Time went backwards")
259                                .as_secs(),
260                            metadata
261                        );
262                    }
263                }
264                FileWriterEnum::FileWriter(mut w) => {
265                    w.close().await?;
266                }
267            };
268            self.current_bached_row_num = 0;
269            return Ok(true);
270        }
271        Ok(false)
272    }
273
274    // Try commit if the batching condition is met.
275    pub async fn try_commit(&mut self) -> Result<bool> {
276        if self.can_commit() {
277            return self.commit().await;
278        }
279        Ok(false)
280    }
281}
282
283/// Private methods related to batching.
284impl OpenDalSinkWriter {
285    /// Method for judging whether batch condiction is met.
286    fn can_commit(&self) -> bool {
287        self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
288            || self.current_bached_row_num >= self.batching_strategy.max_row_count
289    }
290
291    fn path_partition_prefix(&self, duration: &Duration) -> String {
292        let datetime = Utc
293            .timestamp_opt(duration.as_secs() as i64, 0)
294            .single()
295            .expect("Failed to convert timestamp to DateTime<Utc>")
296            .with_timezone(&Utc);
297        let path_partition_prefix = self
298            .batching_strategy
299            .path_partition_prefix
300            .as_ref()
301            .unwrap_or(&PathPartitionPrefix::None);
302        match path_partition_prefix {
303            PathPartitionPrefix::None => "".to_owned(),
304            PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
305            PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
306            PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
307        }
308    }
309
310    fn duration_seconds_since_writer_created(&self) -> usize {
311        let now = SystemTime::now();
312        now.duration_since(self.created_time)
313            .expect("Time went backwards")
314            .as_secs() as usize
315    }
316
317    // Method for writing chunk and update related batching condition.
318    async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
319        match self
320            .sink_writer
321            .as_mut()
322            .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
323        {
324            FileWriterEnum::ParquetFileWriter(w) => {
325                let batch =
326                    IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
327                let batch_row_nums = batch.num_rows();
328                w.write(&batch).await?;
329                self.current_bached_row_num += batch_row_nums;
330            }
331            FileWriterEnum::FileWriter(w) => {
332                let mut chunk_buf = BytesMut::new();
333                let batch_row_nums = chunk.data_chunk().capacity();
334                // write the json representations of the row(s) in current chunk to `chunk_buf`
335                for (op, row) in chunk.rows() {
336                    assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
337                    // to prevent temporary string allocation,
338                    // so we directly write to `chunk_buf` implicitly via `write_fmt`.
339                    writeln!(
340                        chunk_buf,
341                        "{}",
342                        Value::Object(self.row_encoder.encode(row)?)
343                    )
344                    .unwrap(); // write to a `BytesMut` should never fail
345                }
346                w.write(chunk_buf.freeze()).await?;
347                self.current_bached_row_num += batch_row_nums;
348            }
349        }
350        Ok(())
351    }
352}
353
354/// Init methods.
355impl OpenDalSinkWriter {
356    pub fn new(
357        operator: Operator,
358        write_path: &str,
359        rw_schema: Schema,
360        executor_id: u64,
361        format_desc: &SinkFormatDesc,
362        engine_type: EngineType,
363        batching_strategy: BatchingStrategy,
364    ) -> Result<Self> {
365        let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
366        let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
367        let row_encoder = JsonEncoder::new(
368            rw_schema,
369            None,
370            crate::sink::encoder::DateHandlingMode::String,
371            TimestampHandlingMode::String,
372            TimestamptzHandlingMode::UtcString,
373            TimeHandlingMode::String,
374            jsonb_handling_mode,
375        );
376        Ok(Self {
377            schema: Arc::new(arrow_schema),
378            write_path: write_path.to_owned(),
379            operator,
380            sink_writer: None,
381            executor_id,
382            encode_type: format_desc.encode.clone(),
383            row_encoder,
384            engine_type,
385            batching_strategy,
386            current_bached_row_num: 0,
387            created_time: SystemTime::now(),
388        })
389    }
390
391    async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
392        // Todo: specify more file suffixes based on encode_type.
393        let suffix = match self.encode_type {
394            SinkEncode::Parquet => "parquet",
395            SinkEncode::Json => "json",
396            _ => unimplemented!(),
397        };
398
399        let create_time = self
400            .created_time
401            .duration_since(UNIX_EPOCH)
402            .expect("Time went backwards");
403
404        // With batching in place, the file writing process is decoupled from checkpoints.
405        // The current file naming convention is as follows:
406        // 1. A subdirectory is defined based on `path_partition_prefix` (e.g., by day态hour or month or none.).
407        // 2. The file name includes the `executor_id` and the creation time in seconds since the UNIX epoch.
408        // If the engine type is `Fs`, the path is automatically handled, and the filename does not include a path prefix.
409        // 3. For the Snowflake Sink, the `write_path` parameter can be empty.
410        // When the `write_path` is not specified, the data will be written to the root of the specified bucket.
411        let object_name = {
412            let base_path = match self.engine_type {
413                EngineType::Fs => "".to_owned(),
414                EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
415                _ => format!("{}/", self.write_path),
416            };
417
418            format!(
419                "{}{}{}_{}.{}",
420                base_path,
421                self.path_partition_prefix(&create_time),
422                self.executor_id,
423                create_time.as_secs(),
424                suffix,
425            )
426        };
427        Ok(self
428            .operator
429            .writer_with(&object_name)
430            .concurrent(8)
431            .await?)
432    }
433
434    async fn create_sink_writer(&mut self) -> Result<()> {
435        let object_writer = self.create_object_writer().await?;
436        match self.encode_type {
437            SinkEncode::Parquet => {
438                let props = WriterProperties::builder();
439                let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
440                    object_writer.into_futures_async_write().compat_write();
441                self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
442                    AsyncArrowWriter::try_new(
443                        parquet_writer,
444                        self.schema.clone(),
445                        Some(props.build()),
446                    )?,
447                ));
448            }
449            _ => {
450                self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
451            }
452        }
453        self.current_bached_row_num = 0;
454
455        self.created_time = SystemTime::now();
456
457        Ok(())
458    }
459}
460
461fn convert_rw_schema_to_arrow_schema(
462    rw_schema: risingwave_common::catalog::Schema,
463) -> anyhow::Result<arrow_schema_iceberg::Schema> {
464    let mut schema_fields = HashMap::new();
465    rw_schema.fields.iter().for_each(|field| {
466        let res = schema_fields.insert(&field.name, &field.data_type);
467        // This assert is to make sure there is no duplicate field name in the schema.
468        assert!(res.is_none())
469    });
470    let mut arrow_fields = vec![];
471    for rw_field in &rw_schema.fields {
472        let arrow_field = IcebergArrowConvert
473            .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
474
475        arrow_fields.push(arrow_field);
476    }
477
478    Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
479}
480
481/// `BatchingStrategy` represents the strategy for batching data before writing to files.
482///
483/// This struct contains settings that control how data is collected and
484/// partitioned based on specified criteria:
485///
486/// - `max_row_count`: Optional maximum number of rows to accumulate before writing.
487/// - `rollover_seconds`: Optional time interval (in seconds) to trigger a write,
488///   regardless of the number of accumulated rows.
489/// - `path_partition_prefix`: Specifies how files are organized into directories
490///   based on creation time (e.g., by day, month, or hour).
491
492#[serde_as]
493#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
494pub struct BatchingStrategy {
495    #[serde(default = "default_max_row_count")]
496    #[serde_as(as = "DisplayFromStr")]
497    pub max_row_count: usize,
498    #[serde(default = "default_rollover_seconds")]
499    #[serde_as(as = "DisplayFromStr")]
500    pub rollover_seconds: usize,
501    #[serde(default)]
502    #[serde_as(as = "Option<DisplayFromStr>")]
503    pub path_partition_prefix: Option<PathPartitionPrefix>,
504}
505
506/// `PathPartitionPrefix` defines the granularity of file partitions based on creation time.
507///
508/// Each variant specifies how files are organized into directories:
509/// - `None`: No partitioning.
510/// - `Day`: Files are written in a directory for each day.
511/// - `Month`: Files are written in a directory for each month.
512/// - `Hour`: Files are written in a directory for each hour.
513#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
514#[strum(serialize_all = "snake_case")]
515pub enum PathPartitionPrefix {
516    #[default]
517    None = 0,
518    #[serde(alias = "day")]
519    Day = 1,
520    #[serde(alias = "month")]
521    Month = 2,
522    #[serde(alias = "hour")]
523    Hour = 3,
524}