risingwave_connector/sink/file_sink/
opendal_sink.rs1use std::collections::{BTreeMap, HashMap};
16use std::fmt::Write;
17use std::marker::PhantomData;
18use std::sync::Arc;
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21use anyhow::anyhow;
22use bytes::BytesMut;
23use chrono::{TimeZone, Utc};
24use opendal::{FuturesAsyncWriter, Operator, Writer as OpendalWriter};
25use parquet::arrow::AsyncArrowWriter;
26use parquet::file::properties::WriterProperties;
27use risingwave_common::array::arrow::IcebergArrowConvert;
28use risingwave_common::array::arrow::arrow_schema_iceberg::{self, SchemaRef};
29use risingwave_common::array::{Op, StreamChunk};
30use risingwave_common::catalog::Schema;
31use serde::Deserialize;
32use serde_json::Value;
33use serde_with::{DisplayFromStr, serde_as};
34use strum_macros::{Display, EnumString};
35use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
36use with_options::WithOptions;
37
38use crate::enforce_secret::EnforceSecret;
39use crate::sink::catalog::SinkEncode;
40use crate::sink::encoder::{
41 JsonEncoder, JsonbHandlingMode, RowEncoder, TimeHandlingMode, TimestampHandlingMode,
42 TimestamptzHandlingMode,
43};
44use crate::sink::file_sink::batching_log_sink::BatchingLogSinker;
45use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkError, SinkFormatDesc, SinkParam};
46use crate::source::TryFromBTreeMap;
47use crate::with_options::WithOptions;
48
49pub const DEFAULT_ROLLOVER_SECONDS: usize = 10;
50pub const DEFAULT_MAX_ROW_COUNR: usize = 10240;
51
52pub fn default_rollover_seconds() -> usize {
53 DEFAULT_ROLLOVER_SECONDS
54}
55pub fn default_max_row_count() -> usize {
56 DEFAULT_MAX_ROW_COUNR
57}
58#[derive(Debug, Clone)]
64pub struct FileSink<S: OpendalSinkBackend> {
65 pub(crate) op: Operator,
66 pub(crate) path: String,
68 pub(crate) schema: Schema,
70 pub(crate) is_append_only: bool,
71 pub(crate) batching_strategy: BatchingStrategy,
73
74 pub(crate) format_desc: SinkFormatDesc,
76 pub(crate) engine_type: EngineType,
77 pub(crate) _marker: PhantomData<S>,
78}
79
80impl<S: OpendalSinkBackend> EnforceSecret for FileSink<S> {}
81
82pub trait OpendalSinkBackend: Send + Sync + 'static + Clone + PartialEq {
99 type Properties: TryFromBTreeMap + Send + Sync + Clone + WithOptions;
100 const SINK_NAME: &'static str;
101
102 fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties>;
103 fn new_operator(properties: Self::Properties) -> Result<Operator>;
104 fn get_path(properties: Self::Properties) -> String;
105 fn get_engine_type() -> EngineType;
106 fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy;
107}
108
109#[derive(Clone, Debug)]
110pub enum EngineType {
111 Gcs,
112 S3,
113 Fs,
114 Azblob,
115 Webhdfs,
116 Snowflake,
117}
118
119impl<S: OpendalSinkBackend> Sink for FileSink<S> {
120 type Coordinator = DummySinkCommitCoordinator;
121 type LogSinker = BatchingLogSinker;
122
123 const SINK_NAME: &'static str = S::SINK_NAME;
124
125 async fn validate(&self) -> Result<()> {
126 if !self.is_append_only {
127 return Err(SinkError::Config(anyhow!(
128 "File sink only supports append-only mode at present. \
129 Please change the query to append-only, and specify it \
130 explicitly after the `FORMAT ... ENCODE ...` statement. \
131 For example, `FORMAT xxx ENCODE xxx(force_append_only='true')`"
132 )));
133 }
134
135 if self.format_desc.encode != SinkEncode::Parquet
136 && self.format_desc.encode != SinkEncode::Json
137 {
138 return Err(SinkError::Config(anyhow!(
139 "File sink only supports `PARQUET` and `JSON` encode at present."
140 )));
141 }
142
143 match self.op.list(&self.path).await {
144 Ok(_) => Ok(()),
145 Err(e) => Err(anyhow!(e).into()),
146 }
147 }
148
149 async fn new_log_sinker(
150 &self,
151 writer_param: crate::sink::SinkWriterParam,
152 ) -> Result<Self::LogSinker> {
153 let writer = OpenDalSinkWriter::new(
154 self.op.clone(),
155 &self.path,
156 self.schema.clone(),
157 writer_param.executor_id,
158 &self.format_desc,
159 self.engine_type.clone(),
160 self.batching_strategy.clone(),
161 )?;
162 Ok(BatchingLogSinker::new(writer))
163 }
164}
165
166impl<S: OpendalSinkBackend> TryFrom<SinkParam> for FileSink<S> {
167 type Error = SinkError;
168
169 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
170 let schema = param.schema();
171 let config = S::from_btreemap(param.properties)?;
172 let path = S::get_path(config.clone()).clone();
173 let op = S::new_operator(config.clone())?;
174 let batching_strategy = S::get_batching_strategy(config.clone());
175 let engine_type = S::get_engine_type();
176 let format_desc = match param.format_desc {
177 Some(desc) => desc,
178 None => {
179 if let EngineType::Snowflake = engine_type {
180 SinkFormatDesc::plain_json_for_snowflake_only()
181 } else {
182 return Err(SinkError::Config(anyhow!("missing FORMAT ... ENCODE ...")));
183 }
184 }
185 };
186 Ok(Self {
187 op,
188 path,
189 schema,
190 is_append_only: param.sink_type.is_append_only(),
191 batching_strategy,
192 format_desc,
193 engine_type,
194 _marker: PhantomData,
195 })
196 }
197}
198
199pub struct OpenDalSinkWriter {
200 schema: SchemaRef,
201 operator: Operator,
202 sink_writer: Option<FileWriterEnum>,
203 write_path: String,
204 executor_id: u64,
205 encode_type: SinkEncode,
206 row_encoder: JsonEncoder,
207 engine_type: EngineType,
208 pub(crate) batching_strategy: BatchingStrategy,
209 current_bached_row_num: usize,
210 created_time: SystemTime,
211}
212
213enum FileWriterEnum {
225 ParquetFileWriter(AsyncArrowWriter<Compat<FuturesAsyncWriter>>),
226 FileWriter(OpendalWriter),
227}
228
229impl OpenDalSinkWriter {
231 pub async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
233 if self.sink_writer.is_none() {
234 assert_eq!(self.current_bached_row_num, 0);
235 self.create_sink_writer().await?;
236 };
237 self.append_only(chunk).await?;
238 Ok(())
239 }
240
241 pub async fn commit(&mut self) -> Result<bool> {
243 if let Some(sink_writer) = self.sink_writer.take() {
244 match sink_writer {
245 FileWriterEnum::ParquetFileWriter(w) => {
246 if w.bytes_written() > 0 {
247 let metadata = w.close().await?;
248 tracing::info!(
249 "writer {:?}_{:?}finish write file, metadata: {:?}",
250 self.executor_id,
251 self.created_time
252 .duration_since(UNIX_EPOCH)
253 .expect("Time went backwards")
254 .as_secs(),
255 metadata
256 );
257 }
258 }
259 FileWriterEnum::FileWriter(mut w) => {
260 w.close().await?;
261 }
262 };
263 self.current_bached_row_num = 0;
264 return Ok(true);
265 }
266 Ok(false)
267 }
268
269 pub async fn try_commit(&mut self) -> Result<bool> {
271 if self.can_commit() {
272 return self.commit().await;
273 }
274 Ok(false)
275 }
276}
277
278impl OpenDalSinkWriter {
280 fn can_commit(&self) -> bool {
282 self.duration_seconds_since_writer_created() >= self.batching_strategy.rollover_seconds
283 || self.current_bached_row_num >= self.batching_strategy.max_row_count
284 }
285
286 fn path_partition_prefix(&self, duration: &Duration) -> String {
287 let datetime = Utc
288 .timestamp_opt(duration.as_secs() as i64, 0)
289 .single()
290 .expect("Failed to convert timestamp to DateTime<Utc>")
291 .with_timezone(&Utc);
292 let path_partition_prefix = self
293 .batching_strategy
294 .path_partition_prefix
295 .as_ref()
296 .unwrap_or(&PathPartitionPrefix::None);
297 match path_partition_prefix {
298 PathPartitionPrefix::None => "".to_owned(),
299 PathPartitionPrefix::Day => datetime.format("%Y-%m-%d/").to_string(),
300 PathPartitionPrefix::Month => datetime.format("/%Y-%m/").to_string(),
301 PathPartitionPrefix::Hour => datetime.format("/%Y-%m-%d %H:00/").to_string(),
302 }
303 }
304
305 fn duration_seconds_since_writer_created(&self) -> usize {
306 let now = SystemTime::now();
307 now.duration_since(self.created_time)
308 .expect("Time went backwards")
309 .as_secs() as usize
310 }
311
312 async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
314 match self
315 .sink_writer
316 .as_mut()
317 .ok_or_else(|| SinkError::File("Sink writer is not created.".to_owned()))?
318 {
319 FileWriterEnum::ParquetFileWriter(w) => {
320 let batch =
321 IcebergArrowConvert.to_record_batch(self.schema.clone(), chunk.data_chunk())?;
322 let batch_row_nums = batch.num_rows();
323 w.write(&batch).await?;
324 self.current_bached_row_num += batch_row_nums;
325 }
326 FileWriterEnum::FileWriter(w) => {
327 let mut chunk_buf = BytesMut::new();
328 let batch_row_nums = chunk.data_chunk().capacity();
329 for (op, row) in chunk.rows() {
331 assert_eq!(op, Op::Insert, "expect all `op(s)` to be `Op::Insert`");
332 writeln!(
335 chunk_buf,
336 "{}",
337 Value::Object(self.row_encoder.encode(row)?)
338 )
339 .unwrap(); }
341 w.write(chunk_buf.freeze()).await?;
342 self.current_bached_row_num += batch_row_nums;
343 }
344 }
345 Ok(())
346 }
347}
348
349impl OpenDalSinkWriter {
351 pub fn new(
352 operator: Operator,
353 write_path: &str,
354 rw_schema: Schema,
355 executor_id: u64,
356 format_desc: &SinkFormatDesc,
357 engine_type: EngineType,
358 batching_strategy: BatchingStrategy,
359 ) -> Result<Self> {
360 let arrow_schema = convert_rw_schema_to_arrow_schema(rw_schema.clone())?;
361 let jsonb_handling_mode = JsonbHandlingMode::from_options(&format_desc.options)?;
362 let row_encoder = JsonEncoder::new(
363 rw_schema,
364 None,
365 crate::sink::encoder::DateHandlingMode::String,
366 TimestampHandlingMode::String,
367 TimestamptzHandlingMode::UtcString,
368 TimeHandlingMode::String,
369 jsonb_handling_mode,
370 );
371 Ok(Self {
372 schema: Arc::new(arrow_schema),
373 write_path: write_path.to_owned(),
374 operator,
375 sink_writer: None,
376 executor_id,
377 encode_type: format_desc.encode.clone(),
378 row_encoder,
379 engine_type,
380 batching_strategy,
381 current_bached_row_num: 0,
382 created_time: SystemTime::now(),
383 })
384 }
385
386 async fn create_object_writer(&mut self) -> Result<OpendalWriter> {
387 let suffix = match self.encode_type {
389 SinkEncode::Parquet => "parquet",
390 SinkEncode::Json => "json",
391 _ => unimplemented!(),
392 };
393
394 let create_time = self
395 .created_time
396 .duration_since(UNIX_EPOCH)
397 .expect("Time went backwards");
398
399 let object_name = {
407 let base_path = match self.engine_type {
408 EngineType::Fs => "".to_owned(),
409 EngineType::Snowflake if self.write_path.is_empty() => "".to_owned(),
410 _ => format!("{}/", self.write_path),
411 };
412
413 format!(
414 "{}{}{}_{}.{}",
415 base_path,
416 self.path_partition_prefix(&create_time),
417 self.executor_id,
418 create_time.as_secs(),
419 suffix,
420 )
421 };
422 Ok(self
423 .operator
424 .writer_with(&object_name)
425 .concurrent(8)
426 .await?)
427 }
428
429 async fn create_sink_writer(&mut self) -> Result<()> {
430 let object_writer = self.create_object_writer().await?;
431 match self.encode_type {
432 SinkEncode::Parquet => {
433 let props = WriterProperties::builder();
434 let parquet_writer: tokio_util::compat::Compat<opendal::FuturesAsyncWriter> =
435 object_writer.into_futures_async_write().compat_write();
436 self.sink_writer = Some(FileWriterEnum::ParquetFileWriter(
437 AsyncArrowWriter::try_new(
438 parquet_writer,
439 self.schema.clone(),
440 Some(props.build()),
441 )?,
442 ));
443 }
444 _ => {
445 self.sink_writer = Some(FileWriterEnum::FileWriter(object_writer));
446 }
447 }
448 self.current_bached_row_num = 0;
449
450 self.created_time = SystemTime::now();
451
452 Ok(())
453 }
454}
455
456fn convert_rw_schema_to_arrow_schema(
457 rw_schema: risingwave_common::catalog::Schema,
458) -> anyhow::Result<arrow_schema_iceberg::Schema> {
459 let mut schema_fields = HashMap::new();
460 rw_schema.fields.iter().for_each(|field| {
461 let res = schema_fields.insert(&field.name, &field.data_type);
462 assert!(res.is_none())
464 });
465 let mut arrow_fields = vec![];
466 for rw_field in &rw_schema.fields {
467 let arrow_field = IcebergArrowConvert
468 .to_arrow_field(&rw_field.name.clone(), &rw_field.data_type.clone())?;
469
470 arrow_fields.push(arrow_field);
471 }
472
473 Ok(arrow_schema_iceberg::Schema::new(arrow_fields))
474}
475
476#[serde_as]
488#[derive(Default, Deserialize, Debug, Clone, WithOptions)]
489pub struct BatchingStrategy {
490 #[serde(default = "default_max_row_count")]
491 #[serde_as(as = "DisplayFromStr")]
492 pub max_row_count: usize,
493 #[serde(default = "default_rollover_seconds")]
494 #[serde_as(as = "DisplayFromStr")]
495 pub rollover_seconds: usize,
496 #[serde(default)]
497 #[serde_as(as = "Option<DisplayFromStr>")]
498 pub path_partition_prefix: Option<PathPartitionPrefix>,
499}
500
501#[derive(Default, Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
509#[strum(serialize_all = "snake_case")]
510pub enum PathPartitionPrefix {
511 #[default]
512 None = 0,
513 #[serde(alias = "day")]
514 Day = 1,
515 #[serde(alias = "month")]
516 Month = 2,
517 #[serde(alias = "hour")]
518 Hour = 3,
519}