risingwave_connector/sink/file_sink/
opendal_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::basic::Compression;
27use parquet::file::properties::WriterProperties;
28use risingwave_common::array::arrow::IcebergArrowConvert;
29use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
30use risingwave_common::array::{Op, StreamChunk};
31use risingwave_common::catalog::Schema;
32use risingwave_pb::id::ExecutorId;
33use serde::Deserialize;
34use serde_json::Value;
35use serde_with::{DisplayFromStr, serde_as};
36use strum_macros::{Display, EnumString};
37use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
38use uuid::Uuid;
39use with_options::WithOptions;
40
41use crate::enforce_secret::EnforceSecret;
42use crate::sink::catalog::SinkEncode;
43use crate::sink::encoder::{
44    JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
45    TimestamptzHandlingMode,
46};
47use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
48use crate::sink::{Result, Sink, SinkError, SinkFormatDesc, SinkParam};
49use crate::source::TryFromBTreeMap;
50use crate::with_options::WithOptions;
51
52pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
53pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
54
55pub fn default_rollover_seconds() -> usize {
56    DEFAULT_ROLLOVER_SECONDS
57}
58pub fn default_max_row_count() -> usize {
59    DEFAULT_MAX_ROW_COUNR
60}
61/// The `FileSink` struct represents a file sink that uses the `OpendalSinkBackend` trait for its backend implementation.
62///
63/// # Type Parameters
64///
65/// - S: The type parameter S represents the concrete implementation of the `OpendalSinkBackend` trait used by this file sink.
66#[derive(Debug, Clone)]
67pub struct FileSink<S: OpendalSinkBackend> {
68    pub(crate) op: Operator,
69    /// The path to the file where the sink writes data.
70    pub(crate) path: String,
71    /// The schema describing the structure of the data being written to the file sink.
72    pub(crate) schema: Schema,
73    pub(crate) is_append_only: bool,
74    /// The batching strategy for sinking data to files.
75    pub(crate) batching_strategy: BatchingStrategy,
76
77    /// The description of the sink's format.
78    pub(crate) format_desc: SinkFormatDesc,
79    pub(crate) engine_type: EngineType,
80    pub(crate) _marker: PhantomData<S>,
81}
82
83impl<S: OpendalSinkBackend> EnforceSecret for FileSink<S> {}
84
85/// The `OpendalSinkBackend` trait unifies the behavior of various sink backends
86/// implemented through `OpenDAL`(`<https://github.com/apache/opendal>`).
87///
88/// # Type Parameters
89///
90/// - Properties: Represents the necessary parameters for establishing a backend.
91///
92/// # Constants
93///
94/// - `SINK_NAME`: A static string representing the name of the sink.
95///
96/// # Functions
97///
98/// - `from_btreemap`: Automatically parse the required parameters from the input create sink statement.
99/// - `new_operator`: Creates a new operator using the provided backend properties.
100/// - `get_path`: Returns the path of the sink file specified by the user's create sink statement.
101pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
102    type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
103    const SINK_NAME: &'static str;
104
105    fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
106    fn new_operator(properties: Self::Properties) -> Result<Operator>;
107    fn get_path(properties: Self::Properties) -> String;
108    fn get_engine_type() -> EngineType;
109    fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
110}
111
112#[derive(Clone, Debug)]
113pub enum EngineType {
114    Gcs,
115    S3,
116    Fs,
117    Azblob,
118    Webhdfs,
119    Snowflake,
120}
121
122impl<S: OpendalSinkBackend> Sink for FileSink<S> {
123    type LogSinker = BatchingLogSinker;
124
125    const SINK_NAME: &'static str = S::SINK_NAME;
126
127    async fn validate(&self) -> Result<()> {
128        if matches!(self.engine_type, EngineType::Snowflake) {
129            risingwave_common::license::Feature::SnowflakeSink
130                .check_available()
131                .map_err(|e| anyhow::anyhow!(e))?;
132        }
133        if !self.is_append_only {
134            return Err(SinkError::Config(anyhow!(
135                "File sink only supports append-only mode at present. \
136                    Please change the query to append-only, and specify it \
137                    explicitly after the `FORMAT ... ENCODE ...` statement. \
138                    For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
139            )));
140        }
141
142        if self.format_desc.encode != SinkEncode::Parquet
143            && self.format_desc.encode != SinkEncode::Json
144        {
145            return Err(SinkError::Config(anyhow!(
146                "File sink only supports `PARQUET` and `JSON` encode at present."
147            )));
148        }
149
150        match self.op.list(&self.path).await {
151            Ok(_) => Ok(()),
152            Err(e) => Err(anyhow!(e).into()),
153        }
154    }
155
156    async fn new_log_sinker(
157        &self,
158        writer_param: crate::sink::SinkWriterParam,
159    ) -> Result<Self::LogSinker> {
160        let writer = OpenDalSinkWriter::new(
161            self.op.clone(),
162            &self.path,
163            self.schema.clone(),
164            writer_param.executor_id,
165            &self.format_desc,
166            self.engine_type.clone(),
167            self.batching_strategy.clone(),
168        )?;
169        Ok(BatchingLogSinker::new(writer))
170    }
171}
172
173impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
174    type Error = SinkError;
175
176    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
177        let schema = param.schema();
178        let config = S::from_btreemap(param.properties)?;
179        let path = S::get_path(config.clone());
180        let op = S::new_operator(config.clone())?;
181        let batching_strategy = S::get_batching_strategy(config);
182        let engine_type = S::get_engine_type();
183        let format_desc = match param.format_desc {
184            Some(desc) => desc,
185            None => {
186                if let EngineType::Snowflake = engine_type {
187                    SinkFormatDesc::plain_json_for_snowflake_only()
188                } else {
189                    return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
190                }
191            }
192        };
193        Ok(Self {
194            op,
195            path,
196            schema,
197            is_append_only: param.sink_type.is_append_only(),
198            batching_strategy,
199            format_desc,
200            engine_type,
201            _marker: PhantomData,
202        })
203    }
204}
205
206pub struct OpenDalSinkWriter {
207    schema: SchemaRef,
208    operator: Operator,
209    sink_writer: Option<FileWriterEnum>,
210    write_path: String,
211    executor_id: ExecutorId,
212    unique_writer_id: Uuid,
213    encode_type: SinkEncode,
214    row_encoder: JsonEncoder,
215    engine_type: EngineType,
216    pub(crate) batching_strategy: BatchingStrategy,
217    current_bached_row_num: usize,
218    created_time: SystemTime,
219}
220
221/// The `FileWriterEnum` enum represents different types of file writers used for various sink
222/// implementations.
223///
224/// # Variants
225///
226/// - `ParquetFileWriter`: Represents a Parquet file writer using the `AsyncArrowWriter<W>`
227///   for writing data to a Parquet file. It accepts an implementation of W: `AsyncWrite` + `Unpin` + `Send`
228///   as the underlying writer. In this case, the `OpendalWriter` serves as the underlying writer.
229/// - `FileWriter`: Represents a basic `OpenDAL` writer, for writing files in encodes other than parquet.
230///
231/// The choice of writer used during the actual writing process depends on the encode type of the sink.
232enum FileWriterEnum {
233    ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
234    FileWriter(OpendalWriter),
235}
236
237/// Public interface exposed to `BatchingLogSinker`, used to write chunk and commit files.
238impl OpenDalSinkWriter {
239    /// This method writes a chunk.
240    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
241        if self.sink_writer.is_none() {
242            assert_eq!(self.current_bached_row_num, 0);
243            self.create_sink_writer().await?;
244        };
245        self.append_only(chunk).await?;
246        Ok(())
247    }
248
249    /// This method close current writer, finish writing a file and returns whether the commit is successful.
250    pub async fn commit(&mut self) -> Result<bool> {
251        if let Some(sink_writer) = self.sink_writer.take() {
252            match sink_writer {
253                FileWriterEnum::ParquetFileWriter(w) => {
254                    if w.bytes_written() > 0 {
255                        let metadata = w.close().await?;
256                        tracing::info!(
257                            "writer {} (executor_id: {}, created_time: {}) finish write file, metadata: {:?}",
258                            self.unique_writer_id,
259                            self.executor_id,
260                            self.created_time
261                                .duration_since(UNIX_EPOCH)
262                                .expect("Time went backwards")
263                                .as_secs(),
264                            metadata
265                        );
266                    }
267                }
268                FileWriterEnum::FileWriter(mut w) => {
269                    w.close().await?;
270                }
271            };
272            self.current_bached_row_num = 0;
273            return Ok(true);
274        }
275        Ok(false)
276    }
277
278    // Try commit if the batching condition is met.
279    pub async fn try_commit(&mut self) -> Result<bool> {
280        if self.can_commit() {
281            return self.commit().await;
282        }
283        Ok(false)
284    }
285}
286
287/// Private methods related to batching.
288impl OpenDalSinkWriter {
289    /// Method for judging whether batch condition is met.
290    fn can_commit(&self) -> bool {
291        self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
292            || self.current_bached_row_num >= self.batching_strategy.max_row_count
293    }
294
295    fn path_partition_prefix(&self, duration: &Duration) -> String {
296        let datetime = Utc
297            .timestamp_opt(duration.as_secs() as i64, 0)
298            .single()
299            .expect("Failed to convert timestamp to DateTime<Utc>")
300            .with_timezone(&Utc);
301        let path_partition_prefix = self
302            .batching_strategy
303            .path_partition_prefix
304            .as_ref()
305            .unwrap_or(&PathPartitionPrefix::None);
306        match path_partition_prefix {
307            PathPartitionPrefix::None => "".to_owned(),
308            PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
309            PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
310            PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
311        }
312    }
313
314    fn duration_seconds_since_writer_created(&self) -> usize {
315        let now = SystemTime::now();
316        now.duration_since(self.created_time)
317            .expect("Time went backwards")
318            .as_secs() as usize
319    }
320
321    // Method for writing chunk and update related batching condition.
322    async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
323        match self
324            .sink_writer
325            .as_mut()
326            .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
327        {
328            FileWriterEnum::ParquetFileWriter(w) => {
329                let batch =
330                    IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
331                let batch_row_nums = batch.num_rows();
332                w.write(&batch).await?;
333                self.current_bached_row_num += batch_row_nums;
334            }
335            FileWriterEnum::FileWriter(w) => {
336                let mut chunk_buf = BytesMut::new();
337                let batch_row_nums = chunk.data_chunk().capacity();
338                // write the json representations of the row(s) in current chunk to `chunk_buf`
339                for (op, row) in chunk.rows() {
340                    assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
341                    // to prevent temporary string allocation,
342                    // so we directly write to `chunk_buf` implicitly via `write_fmt`.
343                    writeln!(
344                        chunk_buf,
345                        "{}",
346                        Value::Object(self.row_encoder.encode(row)?)
347                    )
348                    .unwrap(); // write to a `BytesMut` should never fail
349                }
350                w.write(chunk_buf.freeze()).await?;
351                self.current_bached_row_num += batch_row_nums;
352            }
353        }
354        Ok(())
355    }
356}
357
358/// Init methods.
359impl OpenDalSinkWriter {
360    pub fn new(
361        operator: Operator,
362        write_path: &str,
363        rw_schema: Schema,
364        executor_id: ExecutorId,
365        format_desc: &SinkFormatDesc,
366        engine_type: EngineType,
367        batching_strategy: BatchingStrategy,
368    ) -> Result<Self> {
369        let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
370        let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
371        let row_encoder = JsonEncoder::new(
372            rw_schema,
373            None,
374            crate::sink::encoder::DateHandlingMode::String,
375            TimestampHandlingMode::String,
376            TimestamptzHandlingMode::UtcString,
377            TimeHandlingMode::String,
378            jsonb_handling_mode,
379        );
380        Ok(Self {
381            schema: Arc::new(arrow_schema),
382            write_path: write_path.to_owned(),
383            operator,
384            sink_writer: None,
385            executor_id,
386            unique_writer_id: Uuid::now_v7(),
387            encode_type: format_desc.encode.clone(),
388            row_encoder,
389            engine_type,
390            batching_strategy,
391            current_bached_row_num: 0,
392            created_time: SystemTime::now(),
393        })
394    }
395
396    async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
397        // Todo: specify more file suffixes based on encode_type.
398        let suffix = match self.encode_type {
399            SinkEncode::Parquet => "parquet",
400            SinkEncode::Json => "json",
401            _ => unimplemented!(),
402        };
403
404        let create_time = self
405            .created_time
406            .duration_since(UNIX_EPOCH)
407            .expect("Time went backwards");
408
409        // With batching in place, the file writing process is decoupled from checkpoints.
410        // The current file naming convention is as follows:
411        // 1. A subdirectory is defined based on `path_partition_prefix` (e.g., by day态hour or month or none.).
412        // 2. The file name includes a unique UUID (v7, which contains timestamp) and the creation time in seconds since the UNIX epoch.
413        // If the engine type is `Fs`, the path is automatically handled, and the filename does not include a path prefix.
414        // 3. For the Snowflake Sink, the `write_path` parameter can be empty.
415        // When the `write_path` is not specified, the data will be written to the root of the specified bucket.
416        let object_name = {
417            let base_path = match self.engine_type {
418                EngineType::Fs => "".to_owned(),
419                EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
420                _ => format!("{}/", self.write_path),
421            };
422
423            format!(
424                "{}{}{}_{}.{}",
425                base_path,
426                self.path_partition_prefix(&create_time),
427                self.unique_writer_id,
428                create_time.as_secs(),
429                suffix,
430            )
431        };
432        Ok(self
433            .operator
434            .writer_with(&object_name)
435            .concurrent(8)
436            .await?)
437    }
438
439    async fn create_sink_writer(&mut self) -> Result<()> {
440        // Update the `created_time` to the current time when creating a new writer.
441        self.created_time = SystemTime::now();
442
443        let object_writer = self.create_object_writer().await?;
444        match self.encode_type {
445            SinkEncode::Parquet => {
446                let props = WriterProperties::builder().set_compression(Compression::SNAPPY);
447                let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
448                    object_writer.into_futures_async_write().compat_write();
449                self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
450                    AsyncArrowWriter::try_new(
451                        parquet_writer,
452                        self.schema.clone(),
453                        Some(props.build()),
454                    )?,
455                ));
456            }
457            _ => {
458                self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
459            }
460        }
461        self.current_bached_row_num = 0;
462
463        Ok(())
464    }
465}
466
467fn convert_rw_schema_to_arrow_schema(
468    rw_schema: risingwave_common::catalog::Schema,
469) -> anyhow::Result<arrow_schema_iceberg::Schema> {
470    let mut schema_fields = HashMap::new();
471    rw_schema.fields.iter().for_each(|field| {
472        let res = schema_fields.insert(&field.name, &field.data_type);
473        // This assert is to make sure there is no duplicate field name in the schema.
474        assert!(res.is_none())
475    });
476    let mut arrow_fields = vec![];
477    for rw_field in &rw_schema.fields {
478        let arrow_field = IcebergArrowConvert
479            .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
480
481        arrow_fields.push(arrow_field);
482    }
483
484    Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
485}
486
487/// `BatchingStrategy` represents the strategy for batching data before writing to files.
488///
489/// This struct contains settings that control how data is collected and
490/// partitioned based on specified criteria:
491///
492/// - `max_row_count`: Optional maximum number of rows to accumulate before writing.
493/// - `rollover_seconds`: Optional time interval (in seconds) to trigger a write,
494///   regardless of the number of accumulated rows.
495/// - `path_partition_prefix`: Specifies how files are organized into directories
496///   based on creation time (e.g., by day, month, or hour).
497
498#[serde_as]
499#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
500pub struct BatchingStrategy {
501    #[serde(default = "default_max_row_count")]
502    #[serde_as(as = "DisplayFromStr")]
503    pub max_row_count: usize,
504    #[serde(default = "default_rollover_seconds")]
505    #[serde_as(as = "DisplayFromStr")]
506    pub rollover_seconds: usize,
507    #[serde(default)]
508    #[serde_as(as = "Option<DisplayFromStr>")]
509    pub path_partition_prefix: Option<PathPartitionPrefix>,
510}
511
512/// `PathPartitionPrefix` defines the granularity of file partitions based on creation time.
513///
514/// Each variant specifies how files are organized into directories:
515/// - `None`: No partitioning.
516/// - `Day`: Files are written in a directory for each day.
517/// - `Month`: Files are written in a directory for each month.
518/// - `Hour`: Files are written in a directory for each hour.
519#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
520#[strum(serialize_all = "snake_case")]
521pub enum PathPartitionPrefix {
522    #[default]
523    None = 0,
524    #[serde(alias = "day")]
525    Day = 1,
526    #[serde(alias = "month")]
527    Month = 2,
528    #[serde(alias = "hour")]
529    Hour = 3,
530}