risingwave_connector/sink/file_sink/
opendal_sink.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::basic::Compression;
27use parquet::file::properties::WriterProperties;
28use risingwave_common::array::arrow::IcebergArrowConvert;
29use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
30use risingwave_common::array::{Op, StreamChunk};
31use risingwave_common::catalog::Schema;
32use risingwave_pb::id::ExecutorId;
33use serde::Deserialize;
34use serde_json::Value;
35use serde_with::{DisplayFromStr, serde_as};
36use strum_macros::{Display, EnumString};
37use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
38use uuid::Uuid;
39use with_options::WithOptions;
40
41use crate::enforce_secret::EnforceSecret;
42use crate::sink::catalog::SinkEncode;
43use crate::sink::encoder::{
44    JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
45    TimestamptzHandlingMode,
46};
47use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
48use crate::sink::{Result, Sink, SinkError, SinkFormatDesc, SinkParam};
49use crate::source::TryFromBTreeMap;
50use crate::with_options::WithOptions;
51
52pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
53pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
54
55pub fn default_rollover_seconds() -> usize {
56    DEFAULT_ROLLOVER_SECONDS
57}
58pub fn default_max_row_count() -> usize {
59    DEFAULT_MAX_ROW_COUNR
60}
61/// The `FileSink` struct represents a file sink that uses the `OpendalSinkBackend` trait for its backend implementation.
62///
63/// # Type Parameters
64///
65/// - S: The type parameter S represents the concrete implementation of the `OpendalSinkBackend` trait used by this file sink.
66#[derive(Debug, Clone)]
67pub struct FileSink<S: OpendalSinkBackend> {
68    pub(crate) op: Operator,
69    /// The path to the file where the sink writes data.
70    pub(crate) path: String,
71    /// The schema describing the structure of the data being written to the file sink.
72    pub(crate) schema: Schema,
73    pub(crate) is_append_only: bool,
74    /// The batching strategy for sinking data to files.
75    pub(crate) batching_strategy: BatchingStrategy,
76
77    /// The description of the sink's format.
78    pub(crate) format_desc: SinkFormatDesc,
79    pub(crate) engine_type: EngineType,
80    pub(crate) _marker: PhantomData<S>,
81}
82
83impl<S: OpendalSinkBackend> EnforceSecret for FileSink<S> {}
84
85/// The `OpendalSinkBackend` trait unifies the behavior of various sink backends
86/// implemented through `OpenDAL`(`<https://github.com/apache/opendal>`).
87///
88/// # Type Parameters
89///
90/// - Properties: Represents the necessary parameters for establishing a backend.
91///
92/// # Constants
93///
94/// - `SINK_NAME`: A static string representing the name of the sink.
95///
96/// # Functions
97///
98/// - `from_btreemap`: Automatically parse the required parameters from the input create sink statement.
99/// - `new_operator`: Creates a new operator using the provided backend properties.
100/// - `get_path`: Returns the path of the sink file specified by the user's create sink statement.
101pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
102    type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
103    const SINK_NAME: &'static str;
104
105    fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
106    fn new_operator(properties: Self::Properties) -> Result<Operator>;
107    fn get_path(properties: Self::Properties) -> String;
108    fn get_engine_type() -> EngineType;
109    fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
110}
111
112#[derive(Clone, Debug)]
113pub enum EngineType {
114    Gcs,
115    S3,
116    Fs,
117    Azblob,
118    Webhdfs,
119    Snowflake,
120}
121
122impl<S: OpendalSinkBackend> Sink for FileSink<S> {
123    type LogSinker = BatchingLogSinker;
124
125    const SINK_NAME: &'static str = S::SINK_NAME;
126
127    async fn validate(&self) -> Result<()> {
128        if matches!(self.engine_type, EngineType::Snowflake) {
129            risingwave_common::license::Feature::SnowflakeSink
130                .check_available()
131                .map_err(|e| anyhow::anyhow!(e))?;
132        }
133        if !self.is_append_only {
134            return Err(SinkError::Config(anyhow!(
135                "File sink only supports append-only mode at present. \
136                    Please change the query to append-only, and specify it \
137                    explicitly after the `FORMAT ... ENCODE ...` statement. \
138                    For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
139            )));
140        }
141
142        if self.format_desc.encode != SinkEncode::Parquet
143            && self.format_desc.encode != SinkEncode::Json
144        {
145            return Err(SinkError::Config(anyhow!(
146                "File sink only supports `PARQUET` and `JSON` encode at present."
147            )));
148        }
149
150        match self.op.list(&self.path).await {
151            Ok(_) => Ok(()),
152            Err(e) => Err(anyhow!(e).into()),
153        }
154    }
155
156    async fn new_log_sinker(
157        &self,
158        writer_param: crate::sink::SinkWriterParam,
159    ) -> Result<Self::LogSinker> {
160        let writer = OpenDalSinkWriter::new(
161            self.op.clone(),
162            &self.path,
163            self.schema.clone(),
164            writer_param.executor_id,
165            &self.format_desc,
166            self.engine_type.clone(),
167            self.batching_strategy.clone(),
168        )?;
169        Ok(BatchingLogSinker::new(writer))
170    }
171}
172
173impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
174    type Error = SinkError;
175
176    fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
177        let schema = param.schema();
178        let config = S::from_btreemap(param.properties)?;
179        let path = S::get_path(config.clone());
180        let op = S::new_operator(config.clone())?;
181        let batching_strategy = S::get_batching_strategy(config);
182        let engine_type = S::get_engine_type();
183        let format_desc = match param.format_desc {
184            Some(desc) => desc,
185            None => {
186                if let EngineType::Snowflake = engine_type {
187                    SinkFormatDesc::plain_json_for_snowflake_only()
188                } else {
189                    return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
190                }
191            }
192        };
193        Ok(Self {
194            op,
195            path,
196            schema,
197            is_append_only: param.sink_type.is_append_only(),
198            batching_strategy,
199            format_desc,
200            engine_type,
201            _marker: PhantomData,
202        })
203    }
204}
205
206pub struct OpenDalSinkWriter {
207    schema: SchemaRef,
208    operator: Operator,
209    sink_writer: Option<FileWriterEnum>,
210    write_path: String,
211    executor_id: ExecutorId,
212    unique_writer_id: Uuid,
213    encode_type: SinkEncode,
214    row_encoder: JsonEncoder,
215    engine_type: EngineType,
216    pub(crate) batching_strategy: BatchingStrategy,
217    current_bached_row_num: usize,
218    created_time: SystemTime,
219    file_seq: u64,
220}
221
222/// The `FileWriterEnum` enum represents different types of file writers used for various sink
223/// implementations.
224///
225/// # Variants
226///
227/// - `ParquetFileWriter`: Represents a Parquet file writer using the `AsyncArrowWriter<W>`
228///   for writing data to a Parquet file. It accepts an implementation of W: `AsyncWrite` + `Unpin` + `Send`
229///   as the underlying writer. In this case, the `OpendalWriter` serves as the underlying writer.
230/// - `FileWriter`: Represents a basic `OpenDAL` writer, for writing files in encodes other than parquet.
231///
232/// The choice of writer used during the actual writing process depends on the encode type of the sink.
233enum FileWriterEnum {
234    ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
235    FileWriter(OpendalWriter),
236}
237
238/// Public interface exposed to `BatchingLogSinker`, used to write chunk and commit files.
239impl OpenDalSinkWriter {
240    /// This method writes a chunk.
241    pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
242        if self.sink_writer.is_none() {
243            assert_eq!(self.current_bached_row_num, 0);
244            self.create_sink_writer().await?;
245        };
246        self.append_only(chunk).await?;
247        Ok(())
248    }
249
250    /// This method close current writer, finish writing a file and returns whether the commit is successful.
251    pub async fn commit(&mut self) -> Result<bool> {
252        if let Some(sink_writer) = self.sink_writer.take() {
253            match sink_writer {
254                FileWriterEnum::ParquetFileWriter(w) => {
255                    let bytes_written = w.bytes_written();
256                    if bytes_written > 0 {
257                        w.close().await?;
258                        tracing::debug!(
259                            "writer {} (executor_id: {}, created_time: {}) finish write file, bytes_written: {}",
260                            self.unique_writer_id,
261                            self.executor_id,
262                            self.created_time
263                                .duration_since(UNIX_EPOCH)
264                                .expect("Time went backwards")
265                                .as_secs(),
266                            bytes_written
267                        );
268                    }
269                }
270                FileWriterEnum::FileWriter(mut w) => {
271                    w.close().await?;
272                }
273            };
274            self.current_bached_row_num = 0;
275            return Ok(true);
276        }
277        Ok(false)
278    }
279
280    /// Returns whether there is pending data (i.e., an active writer that has not been committed yet).
281    pub fn has_pending_data(&self) -> bool {
282        self.sink_writer.is_some()
283    }
284
285    // Try commit if the batching condition is met.
286    pub async fn try_commit(&mut self) -> Result<bool> {
287        if self.can_commit() {
288            return self.commit().await;
289        }
290        Ok(false)
291    }
292}
293
294/// Private methods related to batching.
295impl OpenDalSinkWriter {
296    /// Method for judging whether batch condition is met.
297    fn can_commit(&self) -> bool {
298        self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
299            || self.current_bached_row_num >= self.batching_strategy.max_row_count
300    }
301
302    fn path_partition_prefix(&self, duration: &Duration) -> String {
303        let datetime = Utc
304            .timestamp_opt(duration.as_secs() as i64, 0)
305            .single()
306            .expect("Failed to convert timestamp to DateTime<Utc>")
307            .with_timezone(&Utc);
308        let path_partition_prefix = self
309            .batching_strategy
310            .path_partition_prefix
311            .as_ref()
312            .unwrap_or(&PathPartitionPrefix::None);
313        match path_partition_prefix {
314            PathPartitionPrefix::None => "".to_owned(),
315            PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
316            PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
317            PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
318        }
319    }
320
321    fn duration_seconds_since_writer_created(&self) -> usize {
322        let now = SystemTime::now();
323        now.duration_since(self.created_time)
324            .expect("Time went backwards")
325            .as_secs() as usize
326    }
327
328    // Method for writing chunk and update related batching condition.
329    async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
330        match self
331            .sink_writer
332            .as_mut()
333            .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
334        {
335            FileWriterEnum::ParquetFileWriter(w) => {
336                let batch =
337                    IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
338                let batch_row_nums = batch.num_rows();
339                w.write(&batch).await?;
340                self.current_bached_row_num += batch_row_nums;
341            }
342            FileWriterEnum::FileWriter(w) => {
343                let mut chunk_buf = BytesMut::new();
344                let batch_row_nums = chunk.data_chunk().capacity();
345                // write the json representations of the row(s) in current chunk to `chunk_buf`
346                for (op, row) in chunk.rows() {
347                    assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
348                    // to prevent temporary string allocation,
349                    // so we directly write to `chunk_buf` implicitly via `write_fmt`.
350                    writeln!(
351                        chunk_buf,
352                        "{}",
353                        Value::Object(self.row_encoder.encode(row)?)
354                    )
355                    .unwrap(); // write to a `BytesMut` should never fail
356                }
357                w.write(chunk_buf.freeze()).await?;
358                self.current_bached_row_num += batch_row_nums;
359            }
360        }
361        Ok(())
362    }
363}
364
365/// Init methods.
366impl OpenDalSinkWriter {
367    pub fn new(
368        operator: Operator,
369        write_path: &str,
370        rw_schema: Schema,
371        executor_id: ExecutorId,
372        format_desc: &SinkFormatDesc,
373        engine_type: EngineType,
374        batching_strategy: BatchingStrategy,
375    ) -> Result<Self> {
376        let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
377        let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
378        let row_encoder = JsonEncoder::new(
379            rw_schema,
380            None,
381            crate::sink::encoder::DateHandlingMode::String,
382            TimestampHandlingMode::String,
383            TimestamptzHandlingMode::UtcString,
384            TimeHandlingMode::String,
385            jsonb_handling_mode,
386        );
387        Ok(Self {
388            schema: Arc::new(arrow_schema),
389            write_path: write_path.to_owned(),
390            operator,
391            sink_writer: None,
392            executor_id,
393            unique_writer_id: Uuid::now_v7(),
394            encode_type: format_desc.encode.clone(),
395            row_encoder,
396            engine_type,
397            batching_strategy,
398            current_bached_row_num: 0,
399            created_time: SystemTime::now(),
400            file_seq: 0,
401        })
402    }
403
404    async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
405        // Todo: specify more file suffixes based on encode_type.
406        let suffix = match self.encode_type {
407            SinkEncode::Parquet => "parquet",
408            SinkEncode::Json => "json",
409            _ => unimplemented!(),
410        };
411
412        let create_time = self
413            .created_time
414            .duration_since(UNIX_EPOCH)
415            .expect("Time went backwards");
416
417        // With batching in place, the file writing process is decoupled from checkpoints.
418        // The current file naming convention is as follows:
419        // 1. A subdirectory is defined based on `path_partition_prefix` (e.g., by day态hour or month or none.).
420        // 2. The file name includes a unique UUID (v7, which contains timestamp) and the creation time in seconds since the UNIX epoch.
421        // If the engine type is `Fs`, the path is automatically handled, and the filename does not include a path prefix.
422        // 3. For the Snowflake Sink, the `write_path` parameter can be empty.
423        // When the `write_path` is not specified, the data will be written to the root of the specified bucket.
424        let object_name = {
425            let base_path = match self.engine_type {
426                EngineType::Fs => "".to_owned(),
427                EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
428                _ => format!("{}/", self.write_path),
429            };
430            let current_file_seq = self.file_seq;
431            self.file_seq = self.file_seq.checked_add(1).expect("file seq overflow");
432
433            format!(
434                "{}{}{}_{}_{}.{}",
435                base_path,
436                self.path_partition_prefix(&create_time),
437                self.unique_writer_id,
438                create_time.as_secs(),
439                current_file_seq,
440                suffix,
441            )
442        };
443        Ok(self
444            .operator
445            .writer_with(&object_name)
446            .concurrent(8)
447            .await?)
448    }
449
450    async fn create_sink_writer(&mut self) -> Result<()> {
451        // Update the `created_time` to the current time when creating a new writer.
452        self.created_time = SystemTime::now();
453
454        let object_writer = self.create_object_writer().await?;
455        match self.encode_type {
456            SinkEncode::Parquet => {
457                let props = WriterProperties::builder().set_compression(Compression::SNAPPY);
458                let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
459                    object_writer.into_futures_async_write().compat_write();
460                self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
461                    AsyncArrowWriter::try_new(
462                        parquet_writer,
463                        self.schema.clone(),
464                        Some(props.build()),
465                    )?,
466                ));
467            }
468            _ => {
469                self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
470            }
471        }
472        self.current_bached_row_num = 0;
473
474        Ok(())
475    }
476}
477
478fn convert_rw_schema_to_arrow_schema(
479    rw_schema: risingwave_common::catalog::Schema,
480) -> anyhow::Result<arrow_schema_iceberg::Schema> {
481    let mut schema_fields = HashMap::new();
482    rw_schema.fields.iter().for_each(|field| {
483        let res = schema_fields.insert(&field.name, &field.data_type);
484        // This assert is to make sure there is no duplicate field name in the schema.
485        assert!(res.is_none())
486    });
487    let mut arrow_fields = vec![];
488    for rw_field in &rw_schema.fields {
489        let arrow_field = IcebergArrowConvert
490            .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
491
492        arrow_fields.push(arrow_field);
493    }
494
495    Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
496}
497
498/// `BatchingStrategy` represents the strategy for batching data before writing to files.
499///
500/// This struct contains settings that control how data is collected and
501/// partitioned based on specified criteria:
502///
503/// - `max_row_count`: Optional maximum number of rows to accumulate before writing.
504/// - `rollover_seconds`: Optional time interval (in seconds) to trigger a write,
505///   regardless of the number of accumulated rows.
506/// - `path_partition_prefix`: Specifies how files are organized into directories
507///   based on creation time (e.g., by day, month, or hour).
508
509#[serde_as]
510#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
511pub struct BatchingStrategy {
512    #[serde(default = "default_max_row_count")]
513    #[serde_as(as = "DisplayFromStr")]
514    pub max_row_count: usize,
515    #[serde(default = "default_rollover_seconds")]
516    #[serde_as(as = "DisplayFromStr")]
517    pub rollover_seconds: usize,
518    #[serde(default)]
519    #[serde_as(as = "Option<DisplayFromStr>")]
520    pub path_partition_prefix: Option<PathPartitionPrefix>,
521}
522
523/// `PathPartitionPrefix` defines the granularity of file partitions based on creation time.
524///
525/// Each variant specifies how files are organized into directories:
526/// - `None`: No partitioning.
527/// - `Day`: Files are written in a directory for each day.
528/// - `Month`: Files are written in a directory for each month.
529/// - `Hour`: Files are written in a directory for each hour.
530#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
531#[strum(serialize_all = "snake_case")]
532pub enum PathPartitionPrefix {
533    #[default]
534    None = 0,
535    #[serde(alias = "day")]
536    Day = 1,
537    #[serde(alias = "month")]
538    Month = 2,
539    #[serde(alias = "hour")]
540    Hour = 3,
541}