risingwave_connector/sink/file_sink/
opendal_sink.rs1use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::basic::Compression;
27use parquet::file::properties::WriterProperties;
28use risingwave_common::array::arrow::IcebergArrowConvert;
29use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
30use risingwave_common::array::{Op, StreamChunk};
31use risingwave_common::catalog::Schema;
32use risingwave_pb::id::ExecutorId;
33use serde::Deserialize;
34use serde_json::Value;
35use serde_with::{DisplayFromStr, serde_as};
36use strum_macros::{Display, EnumString};
37use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
38use uuid::Uuid;
39use with_options::WithOptions;
40
41use crate::enforce_secret::EnforceSecret;
42use crate::sink::catalog::SinkEncode;
43use crate::sink::encoder::{
44 JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
45 TimestamptzHandlingMode,
46};
47use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
48use crate::sink::{Result, Sink, SinkError, SinkFormatDesc, SinkParam};
49use crate::source::TryFromBTreeMap;
50use crate::with_options::WithOptions;
51
52pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
53pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
54
55pub fn default_rollover_seconds() -> usize {
56 DEFAULT_ROLLOVER_SECONDS
57}
58pub fn default_max_row_count() -> usize {
59 DEFAULT_MAX_ROW_COUNR
60}
61#[derive(Debug, Clone)]
67pub struct FileSink<S: OpendalSinkBackend> {
68 pub(crate) op: Operator,
69 pub(crate) path: String,
71 pub(crate) schema: Schema,
73 pub(crate) is_append_only: bool,
74 pub(crate) batching_strategy: BatchingStrategy,
76
77 pub(crate) format_desc: SinkFormatDesc,
79 pub(crate) engine_type: EngineType,
80 pub(crate) _marker: PhantomData<S>,
81}
82
83impl<S: OpendalSinkBackend> EnforceSecret for FileSink<S> {}
84
85pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
102 type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
103 const SINK_NAME: &'static str;
104
105 fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
106 fn new_operator(properties: Self::Properties) -> Result<Operator>;
107 fn get_path(properties: Self::Properties) -> String;
108 fn get_engine_type() -> EngineType;
109 fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
110}
111
112#[derive(Clone, Debug)]
113pub enum EngineType {
114 Gcs,
115 S3,
116 Fs,
117 Azblob,
118 Webhdfs,
119 Snowflake,
120}
121
122impl<S: OpendalSinkBackend> Sink for FileSink<S> {
123 type LogSinker = BatchingLogSinker;
124
125 const SINK_NAME: &'static str = S::SINK_NAME;
126
127 async fn validate(&self) -> Result<()> {
128 if matches!(self.engine_type, EngineType::Snowflake) {
129 risingwave_common::license::Feature::SnowflakeSink
130 .check_available()
131 .map_err(|e| anyhow::anyhow!(e))?;
132 }
133 if !self.is_append_only {
134 return Err(SinkError::Config(anyhow!(
135 "File sink only supports append-only mode at present. \
136 Please change the query to append-only, and specify it \
137 explicitly after the `FORMAT ... ENCODE ...` statement. \
138 For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
139 )));
140 }
141
142 if self.format_desc.encode != SinkEncode::Parquet
143 && self.format_desc.encode != SinkEncode::Json
144 {
145 return Err(SinkError::Config(anyhow!(
146 "File sink only supports `PARQUET` and `JSON` encode at present."
147 )));
148 }
149
150 match self.op.list(&self.path).await {
151 Ok(_) => Ok(()),
152 Err(e) => Err(anyhow!(e).into()),
153 }
154 }
155
156 async fn new_log_sinker(
157 &self,
158 writer_param: crate::sink::SinkWriterParam,
159 ) -> Result<Self::LogSinker> {
160 let writer = OpenDalSinkWriter::new(
161 self.op.clone(),
162 &self.path,
163 self.schema.clone(),
164 writer_param.executor_id,
165 &self.format_desc,
166 self.engine_type.clone(),
167 self.batching_strategy.clone(),
168 )?;
169 Ok(BatchingLogSinker::new(writer))
170 }
171}
172
173impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
174 type Error = SinkError;
175
176 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
177 let schema = param.schema();
178 let config = S::from_btreemap(param.properties)?;
179 let path = S::get_path(config.clone());
180 let op = S::new_operator(config.clone())?;
181 let batching_strategy = S::get_batching_strategy(config);
182 let engine_type = S::get_engine_type();
183 let format_desc = match param.format_desc {
184 Some(desc) => desc,
185 None => {
186 if let EngineType::Snowflake = engine_type {
187 SinkFormatDesc::plain_json_for_snowflake_only()
188 } else {
189 return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
190 }
191 }
192 };
193 Ok(Self {
194 op,
195 path,
196 schema,
197 is_append_only: param.sink_type.is_append_only(),
198 batching_strategy,
199 format_desc,
200 engine_type,
201 _marker: PhantomData,
202 })
203 }
204}
205
206pub struct OpenDalSinkWriter {
207 schema: SchemaRef,
208 operator: Operator,
209 sink_writer: Option<FileWriterEnum>,
210 write_path: String,
211 executor_id: ExecutorId,
212 unique_writer_id: Uuid,
213 encode_type: SinkEncode,
214 row_encoder: JsonEncoder,
215 engine_type: EngineType,
216 pub(crate) batching_strategy: BatchingStrategy,
217 current_bached_row_num: usize,
218 created_time: SystemTime,
219}
220
221enum FileWriterEnum {
233 ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
234 FileWriter(OpendalWriter),
235}
236
237impl OpenDalSinkWriter {
239 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
241 if self.sink_writer.is_none() {
242 assert_eq!(self.current_bached_row_num, 0);
243 self.create_sink_writer().await?;
244 };
245 self.append_only(chunk).await?;
246 Ok(())
247 }
248
249 pub async fn commit(&mut self) -> Result<bool> {
251 if let Some(sink_writer) = self.sink_writer.take() {
252 match sink_writer {
253 FileWriterEnum::ParquetFileWriter(w) => {
254 if w.bytes_written() > 0 {
255 let metadata = w.close().await?;
256 tracing::info!(
257 "writer {} (executor_id: {}, created_time: {}) finish write file, metadata: {:?}",
258 self.unique_writer_id,
259 self.executor_id,
260 self.created_time
261 .duration_since(UNIX_EPOCH)
262 .expect("Time went backwards")
263 .as_secs(),
264 metadata
265 );
266 }
267 }
268 FileWriterEnum::FileWriter(mut w) => {
269 w.close().await?;
270 }
271 };
272 self.current_bached_row_num = 0;
273 return Ok(true);
274 }
275 Ok(false)
276 }
277
278 pub async fn try_commit(&mut self) -> Result<bool> {
280 if self.can_commit() {
281 return self.commit().await;
282 }
283 Ok(false)
284 }
285}
286
287impl OpenDalSinkWriter {
289 fn can_commit(&self) -> bool {
291 self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
292 || self.current_bached_row_num >= self.batching_strategy.max_row_count
293 }
294
295 fn path_partition_prefix(&self, duration: &Duration) -> String {
296 let datetime = Utc
297 .timestamp_opt(duration.as_secs() as i64, 0)
298 .single()
299 .expect("Failed to convert timestamp to DateTime<Utc>")
300 .with_timezone(&Utc);
301 let path_partition_prefix = self
302 .batching_strategy
303 .path_partition_prefix
304 .as_ref()
305 .unwrap_or(&PathPartitionPrefix::None);
306 match path_partition_prefix {
307 PathPartitionPrefix::None => "".to_owned(),
308 PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
309 PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
310 PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
311 }
312 }
313
314 fn duration_seconds_since_writer_created(&self) -> usize {
315 let now = SystemTime::now();
316 now.duration_since(self.created_time)
317 .expect("Time went backwards")
318 .as_secs() as usize
319 }
320
321 async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
323 match self
324 .sink_writer
325 .as_mut()
326 .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
327 {
328 FileWriterEnum::ParquetFileWriter(w) => {
329 let batch =
330 IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
331 let batch_row_nums = batch.num_rows();
332 w.write(&batch).await?;
333 self.current_bached_row_num += batch_row_nums;
334 }
335 FileWriterEnum::FileWriter(w) => {
336 let mut chunk_buf = BytesMut::new();
337 let batch_row_nums = chunk.data_chunk().capacity();
338 for (op, row) in chunk.rows() {
340 assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
341 writeln!(
344 chunk_buf,
345 "{}",
346 Value::Object(self.row_encoder.encode(row)?)
347 )
348 .unwrap(); }
350 w.write(chunk_buf.freeze()).await?;
351 self.current_bached_row_num += batch_row_nums;
352 }
353 }
354 Ok(())
355 }
356}
357
358impl OpenDalSinkWriter {
360 pub fn new(
361 operator: Operator,
362 write_path: &str,
363 rw_schema: Schema,
364 executor_id: ExecutorId,
365 format_desc: &SinkFormatDesc,
366 engine_type: EngineType,
367 batching_strategy: BatchingStrategy,
368 ) -> Result<Self> {
369 let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
370 let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
371 let row_encoder = JsonEncoder::new(
372 rw_schema,
373 None,
374 crate::sink::encoder::DateHandlingMode::String,
375 TimestampHandlingMode::String,
376 TimestamptzHandlingMode::UtcString,
377 TimeHandlingMode::String,
378 jsonb_handling_mode,
379 );
380 Ok(Self {
381 schema: Arc::new(arrow_schema),
382 write_path: write_path.to_owned(),
383 operator,
384 sink_writer: None,
385 executor_id,
386 unique_writer_id: Uuid::now_v7(),
387 encode_type: format_desc.encode.clone(),
388 row_encoder,
389 engine_type,
390 batching_strategy,
391 current_bached_row_num: 0,
392 created_time: SystemTime::now(),
393 })
394 }
395
396 async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
397 let suffix = match self.encode_type {
399 SinkEncode::Parquet => "parquet",
400 SinkEncode::Json => "json",
401 _ => unimplemented!(),
402 };
403
404 let create_time = self
405 .created_time
406 .duration_since(UNIX_EPOCH)
407 .expect("Time went backwards");
408
409 let object_name = {
417 let base_path = match self.engine_type {
418 EngineType::Fs => "".to_owned(),
419 EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
420 _ => format!("{}/", self.write_path),
421 };
422
423 format!(
424 "{}{}{}_{}.{}",
425 base_path,
426 self.path_partition_prefix(&create_time),
427 self.unique_writer_id,
428 create_time.as_secs(),
429 suffix,
430 )
431 };
432 Ok(self
433 .operator
434 .writer_with(&object_name)
435 .concurrent(8)
436 .await?)
437 }
438
439 async fn create_sink_writer(&mut self) -> Result<()> {
440 self.created_time = SystemTime::now();
442
443 let object_writer = self.create_object_writer().await?;
444 match self.encode_type {
445 SinkEncode::Parquet => {
446 let props = WriterProperties::builder().set_compression(Compression::SNAPPY);
447 let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
448 object_writer.into_futures_async_write().compat_write();
449 self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
450 AsyncArrowWriter::try_new(
451 parquet_writer,
452 self.schema.clone(),
453 Some(props.build()),
454 )?,
455 ));
456 }
457 _ => {
458 self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
459 }
460 }
461 self.current_bached_row_num = 0;
462
463 Ok(())
464 }
465}
466
467fn convert_rw_schema_to_arrow_schema(
468 rw_schema: risingwave_common::catalog::Schema,
469) -> anyhow::Result<arrow_schema_iceberg::Schema> {
470 let mut schema_fields = HashMap::new();
471 rw_schema.fields.iter().for_each(|field| {
472 let res = schema_fields.insert(&field.name, &field.data_type);
473 assert!(res.is_none())
475 });
476 let mut arrow_fields = vec![];
477 for rw_field in &rw_schema.fields {
478 let arrow_field = IcebergArrowConvert
479 .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
480
481 arrow_fields.push(arrow_field);
482 }
483
484 Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
485}
486
487#[serde_as]
499#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
500pub struct BatchingStrategy {
501 #[serde(default = "default_max_row_count")]
502 #[serde_as(as = "DisplayFromStr")]
503 pub max_row_count: usize,
504 #[serde(default = "default_rollover_seconds")]
505 #[serde_as(as = "DisplayFromStr")]
506 pub rollover_seconds: usize,
507 #[serde(default)]
508 #[serde_as(as = "Option<DisplayFromStr>")]
509 pub path_partition_prefix: Option<PathPartitionPrefix>,
510}
511
512#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
520#[strum(serialize_all = "snake_case")]
521pub enum PathPartitionPrefix {
522 #[default]
523 None = 0,
524 #[serde(alias = "day")]
525 Day = 1,
526 #[serde(alias = "month")]
527 Month = 2,
528 #[serde(alias = "hour")]
529 Hour = 3,
530}