risingwave_connector/sink/file_sink/
opendal_sink.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::file::properties::WriterProperties;
27use risingwave_common::array::arrow::IcebergArrowConvert;
28use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
29use risingwave_common::array::{Op, StreamChunk};
30use risingwave_common::catalog::Schema;
31use serde::Deserialize;
32use serde_json::Value;
33use serde_with::{DisplayFromStr, serde_as};
34use strum_macros::{Display, EnumString};
35use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
36use with_options::WithOptions;
37
38use crate::enforce_secret::EnforceSecret;
39use crate::sink::catalog::SinkEncode;
40use crate::sink::encoder::{
41    JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
42    TimestamptzHandlingMode,
43};
44use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
45use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam};
46use crate::source::TryFromBTreeMap;
47use crate::with_options::WithOptions;
48
49pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
50pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
51
52pub fn default_rollover_seconds() -> usize {
53    DEFAULT_ROLLOVER_SECONDS
54}
55pub fn default_max_row_count() -> usize {
56    DEFAULT_MAX_ROW_COUNR
57}
58/// The `FileSink` struct represents a file sink that uses the `OpendalSinkBackend` trait for its backend implementation.
59///
60/// # Type Parameters
61///
62/// - S: The type parameter S represents the concrete implementation of the `OpendalSinkBackend` trait used by this file sink.
63#[derive(Debug, Clone)]
64pub struct FileSink<S: OpendalSinkBackend> {
65    pub(crate) op: Operator,
66    /// The path to the file where the sink writes data.
67    pub(crate) path: String,
68    /// The schema describing the structure of the data being written to the file sink.
69    pub(crate) schema: Schema,
70    pub(crate) is_append_only: bool,
71    /// The batching strategy for sinking data to files.
72    pub(crate) batching_strategy: BatchingStrategy,
73
74    /// The description of the sink's format.
75    pub(crate) format_desc: SinkFormatDesc,
76    pub(crate) engine_type: EngineType,
77    pub(crate) _marker: PhantomData<S>,
78}
79
80impl<S: OpendalSinkBackend> EnforceSecret for FileSink<S> {}
81
82/// The `OpendalSinkBackend` trait unifies the behavior of various sink backends
83/// implemented through `OpenDAL`(`<https://github.com/apache/opendal>`).
84///
85/// # Type Parameters
86///
87/// - Properties: Represents the necessary parameters for establishing a backend.
88///
89/// # Constants
90///
91/// - `SINK_NAME`: A static string representing the name of the sink.
92///
93/// # Functions
94///
95/// - `from_btreemap`: Automatically parse the required parameters from the input create sink statement.
96/// - `new_operator`: Creates a new operator using the provided backend properties.
97/// - `get_path`: Returns the path of the sink file specified by the user's create sink statement.
98pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
99    type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
100    const SINK_NAME: &'static str;
101
102    fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
103    fn new_operator(properties: Self::Properties) -> Result<Operator>;
104    fn get_path(properties: Self::Properties) -> String;
105    fn get_engine_type() -> EngineType;
106    fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
107}
108
109#[derive(Clone, Debug)]
110pub enum EngineType {
111    Gcs,
112    S3,
113    Fs,
114    Azblob,
115    Webhdfs,
116    Snowflake,
117}
118
119impl<S: OpendalSinkBackend> Sink for FileSink<S> {
120    type Coordinator = DummySinkCommitCoordinator;
121    type LogSinker = BatchingLogSinker;
122
123    const SINK_NAME: &'static str = S::SINK_NAME;
124
125    async fn validate(&self) -> Result<()> {
126        if !self.is_append_only {
127            return Err(SinkError::Config(anyhow!(
128                "File sink only supports append-only mode at present. \
129                    Please change the query to append-only, and specify it \
130                    explicitly after the `FORMAT ... ENCODE ...` statement. \
131                    For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
132            )));
133        }
134
135        if self.format_desc.encode != SinkEncode::Parquet
136            && self.format_desc.encode != SinkEncode::Json
137        {
138            return Err(SinkError::Config(anyhow!(
139                "File sink only supports `PARQUET` and `JSON` encode at present."
140            )));
141        }
142
143        match self.op.list(&self.path).await {
144            Ok(_) => Ok(()),
145            Err(e) => Err(anyhow!(e).into()),
146        }
147    }
148
149    async fn new_log_sinker(
150        &self,
151        writer_param: crate::sink::SinkWriterParam,
152    ) -> Result<Self::LogSinker> {
153        let writer = OpenDalSinkWriter::new(
154            self.op.clone(),
155            &self.path,
156            self.schema.clone(),
157            writer_param.executor_id,
158            &self.format_desc,
159            self.engine_type.clone(),
160            self.batching_strategy.clone(),
161        )?;
162        Ok(BatchingLogSinker::new(writer))
163    }
164}
165
166impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
167    type Error = SinkError;
168
169    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
170        let schema = param.schema();
171        let config = S::from_btreemap(param.properties)?;
172        let path = S::get_path(config.clone()).clone();
173        let op = S::new_operator(config.clone())?;
174        let batching_strategy = S::get_batching_strategy(config.clone());
175        let engine_type = S::get_engine_type();
176        let format_desc = match param.format_desc {
177            Some(desc) => desc,
178            None => {
179                if let EngineType::Snowflake = engine_type {
180                    SinkFormatDesc::plain_json_for_snowflake_only()
181                } else {
182                    return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
183                }
184            }
185        };
186        Ok(Self {
187            op,
188            path,
189            schema,
190            is_append_only: param.sink_type.is_append_only(),
191            batching_strategy,
192            format_desc,
193            engine_type,
194            _marker: PhantomData,
195        })
196    }
197}
198
199pub struct OpenDalSinkWriter {
200    schema: SchemaRef,
201    operator: Operator,
202    sink_writer: Option<FileWriterEnum>,
203    write_path: String,
204    executor_id: u64,
205    encode_type: SinkEncode,
206    row_encoder: JsonEncoder,
207    engine_type: EngineType,
208    pub(crate) batching_strategy: BatchingStrategy,
209    current_bached_row_num: usize,
210    created_time: SystemTime,
211}
212
213/// The `FileWriterEnum` enum represents different types of file writers used for various sink
214/// implementations.
215///
216/// # Variants
217///
218/// - `ParquetFileWriter`: Represents a Parquet file writer using the `AsyncArrowWriter<W>`
219///   for writing data to a Parquet file. It accepts an implementation of W: `AsyncWrite` + `Unpin` + `Send`
220///   as the underlying writer. In this case, the `OpendalWriter` serves as the underlying writer.
221/// - `FileWriter`: Represents a basic `OpenDAL` writer, for writing files in encodes other than parquet.
222///
223/// The choice of writer used during the actual writing process depends on the encode type of the sink.
224enum FileWriterEnum {
225    ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
226    FileWriter(OpendalWriter),
227}
228
229/// Public interface exposed to `BatchingLogSinker`, used to write chunk and commit files.
230impl OpenDalSinkWriter {
231    /// This method writes a chunk.
232    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
233        if self.sink_writer.is_none() {
234            assert_eq!(self.current_bached_row_num, 0);
235            self.create_sink_writer().await?;
236        };
237        self.append_only(chunk).await?;
238        Ok(())
239    }
240
241    /// This method close current writer, finish writing a file and returns whether the commit is successful.
242    pub async fn commit(&mut self) -> Result<bool> {
243        if let Some(sink_writer) = self.sink_writer.take() {
244            match sink_writer {
245                FileWriterEnum::ParquetFileWriter(w) => {
246                    if w.bytes_written() > 0 {
247                        let metadata = w.close().await?;
248                        tracing::info!(
249                            "writer {:?}_{:?}finish write file, metadata: {:?}",
250                            self.executor_id,
251                            self.created_time
252                                .duration_since(UNIX_EPOCH)
253                                .expect("Time went backwards")
254                                .as_secs(),
255                            metadata
256                        );
257                    }
258                }
259                FileWriterEnum::FileWriter(mut w) => {
260                    w.close().await?;
261                }
262            };
263            self.current_bached_row_num = 0;
264            return Ok(true);
265        }
266        Ok(false)
267    }
268
269    // Try commit if the batching condition is met.
270    pub async fn try_commit(&mut self) -> Result<bool> {
271        if self.can_commit() {
272            return self.commit().await;
273        }
274        Ok(false)
275    }
276}
277
278/// Private methods related to batching.
279impl OpenDalSinkWriter {
280    /// Method for judging whether batch condiction is met.
281    fn can_commit(&self) -> bool {
282        self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
283            || self.current_bached_row_num >= self.batching_strategy.max_row_count
284    }
285
286    fn path_partition_prefix(&self, duration: &Duration) -> String {
287        let datetime = Utc
288            .timestamp_opt(duration.as_secs() as i64, 0)
289            .single()
290            .expect("Failed to convert timestamp to DateTime<Utc>")
291            .with_timezone(&Utc);
292        let path_partition_prefix = self
293            .batching_strategy
294            .path_partition_prefix
295            .as_ref()
296            .unwrap_or(&PathPartitionPrefix::None);
297        match path_partition_prefix {
298            PathPartitionPrefix::None => "".to_owned(),
299            PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
300            PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
301            PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
302        }
303    }
304
305    fn duration_seconds_since_writer_created(&self) -> usize {
306        let now = SystemTime::now();
307        now.duration_since(self.created_time)
308            .expect("Time went backwards")
309            .as_secs() as usize
310    }
311
312    // Method for writing chunk and update related batching condition.
313    async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
314        match self
315            .sink_writer
316            .as_mut()
317            .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
318        {
319            FileWriterEnum::ParquetFileWriter(w) => {
320                let batch =
321                    IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
322                let batch_row_nums = batch.num_rows();
323                w.write(&batch).await?;
324                self.current_bached_row_num += batch_row_nums;
325            }
326            FileWriterEnum::FileWriter(w) => {
327                let mut chunk_buf = BytesMut::new();
328                let batch_row_nums = chunk.data_chunk().capacity();
329                // write the json representations of the row(s) in current chunk to `chunk_buf`
330                for (op, row) in chunk.rows() {
331                    assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
332                    // to prevent temporary string allocation,
333                    // so we directly write to `chunk_buf` implicitly via `write_fmt`.
334                    writeln!(
335                        chunk_buf,
336                        "{}",
337                        Value::Object(self.row_encoder.encode(row)?)
338                    )
339                    .unwrap(); // write to a `BytesMut` should never fail
340                }
341                w.write(chunk_buf.freeze()).await?;
342                self.current_bached_row_num += batch_row_nums;
343            }
344        }
345        Ok(())
346    }
347}
348
349/// Init methods.
350impl OpenDalSinkWriter {
351    pub fn new(
352        operator: Operator,
353        write_path: &str,
354        rw_schema: Schema,
355        executor_id: u64,
356        format_desc: &SinkFormatDesc,
357        engine_type: EngineType,
358        batching_strategy: BatchingStrategy,
359    ) -> Result<Self> {
360        let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
361        let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
362        let row_encoder = JsonEncoder::new(
363            rw_schema,
364            None,
365            crate::sink::encoder::DateHandlingMode::String,
366            TimestampHandlingMode::String,
367            TimestamptzHandlingMode::UtcString,
368            TimeHandlingMode::String,
369            jsonb_handling_mode,
370        );
371        Ok(Self {
372            schema: Arc::new(arrow_schema),
373            write_path: write_path.to_owned(),
374            operator,
375            sink_writer: None,
376            executor_id,
377            encode_type: format_desc.encode.clone(),
378            row_encoder,
379            engine_type,
380            batching_strategy,
381            current_bached_row_num: 0,
382            created_time: SystemTime::now(),
383        })
384    }
385
386    async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
387        // Todo: specify more file suffixes based on encode_type.
388        let suffix = match self.encode_type {
389            SinkEncode::Parquet => "parquet",
390            SinkEncode::Json => "json",
391            _ => unimplemented!(),
392        };
393
394        let create_time = self
395            .created_time
396            .duration_since(UNIX_EPOCH)
397            .expect("Time went backwards");
398
399        // With batching in place, the file writing process is decoupled from checkpoints.
400        // The current file naming convention is as follows:
401        // 1. A subdirectory is defined based on `path_partition_prefix` (e.g., by day态hour or month or none.).
402        // 2. The file name includes the `executor_id` and the creation time in seconds since the UNIX epoch.
403        // If the engine type is `Fs`, the path is automatically handled, and the filename does not include a path prefix.
404        // 3. For the Snowflake Sink, the `write_path` parameter can be empty.
405        // When the `write_path` is not specified, the data will be written to the root of the specified bucket.
406        let object_name = {
407            let base_path = match self.engine_type {
408                EngineType::Fs => "".to_owned(),
409                EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
410                _ => format!("{}/", self.write_path),
411            };
412
413            format!(
414                "{}{}{}_{}.{}",
415                base_path,
416                self.path_partition_prefix(&create_time),
417                self.executor_id,
418                create_time.as_secs(),
419                suffix,
420            )
421        };
422        Ok(self
423            .operator
424            .writer_with(&object_name)
425            .concurrent(8)
426            .await?)
427    }
428
429    async fn create_sink_writer(&mut self) -> Result<()> {
430        let object_writer = self.create_object_writer().await?;
431        match self.encode_type {
432            SinkEncode::Parquet => {
433                let props = WriterProperties::builder();
434                let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
435                    object_writer.into_futures_async_write().compat_write();
436                self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
437                    AsyncArrowWriter::try_new(
438                        parquet_writer,
439                        self.schema.clone(),
440                        Some(props.build()),
441                    )?,
442                ));
443            }
444            _ => {
445                self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
446            }
447        }
448        self.current_bached_row_num = 0;
449
450        self.created_time = SystemTime::now();
451
452        Ok(())
453    }
454}
455
456fn convert_rw_schema_to_arrow_schema(
457    rw_schema: risingwave_common::catalog::Schema,
458) -> anyhow::Result<arrow_schema_iceberg::Schema> {
459    let mut schema_fields = HashMap::new();
460    rw_schema.fields.iter().for_each(|field| {
461        let res = schema_fields.insert(&field.name, &field.data_type);
462        // This assert is to make sure there is no duplicate field name in the schema.
463        assert!(res.is_none())
464    });
465    let mut arrow_fields = vec![];
466    for rw_field in &rw_schema.fields {
467        let arrow_field = IcebergArrowConvert
468            .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
469
470        arrow_fields.push(arrow_field);
471    }
472
473    Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
474}
475
476/// `BatchingStrategy` represents the strategy for batching data before writing to files.
477///
478/// This struct contains settings that control how data is collected and
479/// partitioned based on specified criteria:
480///
481/// - `max_row_count`: Optional maximum number of rows to accumulate before writing.
482/// - `rollover_seconds`: Optional time interval (in seconds) to trigger a write,
483///   regardless of the number of accumulated rows.
484/// - `path_partition_prefix`: Specifies how files are organized into directories
485///   based on creation time (e.g., by day, month, or hour).
486
487#[serde_as]
488#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
489pub struct BatchingStrategy {
490    #[serde(default = "default_max_row_count")]
491    #[serde_as(as = "DisplayFromStr")]
492    pub max_row_count: usize,
493    #[serde(default = "default_rollover_seconds")]
494    #[serde_as(as = "DisplayFromStr")]
495    pub rollover_seconds: usize,
496    #[serde(default)]
497    #[serde_as(as = "Option<DisplayFromStr>")]
498    pub path_partition_prefix: Option<PathPartitionPrefix>,
499}
500
501/// `PathPartitionPrefix` defines the granularity of file partitions based on creation time.
502///
503/// Each variant specifies how files are organized into directories:
504/// - `None`: No partitioning.
505/// - `Day`: Files are written in a directory for each day.
506/// - `Month`: Files are written in a directory for each month.
507/// - `Hour`: Files are written in a directory for each hour.
508#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
509#[strum(serialize_all = "snake_case")]
510pub enum PathPartitionPrefix {
511    #[default]
512    None = 0,
513    #[serde(alias = "day")]
514    Day = 1,
515    #[serde(alias = "month")]
516    Month = 2,
517    #[serde(alias = "hour")]
518    Hour = 3,
519}