1use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23 AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24 AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType};
27use deltalake::operations::transaction::CommitBuilder;
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use risingwave_common::array::StreamChunk;
31use risingwave_common::array::arrow::DeltaLakeConvert;
32use risingwave_common::bail;
33use risingwave_common::catalog::Schema;
34use risingwave_common::types::DataType;
35use risingwave_common::util::iter_util::ZipEqDebug;
36use risingwave_pb::connector_service::SinkMetadata;
37use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
38use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
39use sea_orm::DatabaseConnection;
40use serde_derive::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use with_options::WithOptions;
43
44use super::coordinate::CoordinatedLogSinker;
45use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
46use super::writer::SinkWriter;
47use super::{
48 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkCommitCoordinator,
49 SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterParam,
50};
51use crate::connector_common::AwsAuthProps;
52
53pub const DELTALAKE_SINK: &str = "deltalake";
54pub const DEFAULT_REGION: &str = "us-east-1";
55pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
56
57#[serde_as]
58#[derive(Deserialize, Debug, Clone, WithOptions)]
59pub struct DeltaLakeCommon {
60 #[serde(rename = "location")]
61 pub location: String,
62 #[serde(flatten)]
63 pub aws_auth_props: AwsAuthProps,
64
65 #[serde(rename = "gcs.service.account")]
66 pub gcs_service_account: Option<String>,
67 #[serde(default = "default_commit_checkpoint_interval")]
69 #[serde_as(as = "DisplayFromStr")]
70 pub commit_checkpoint_interval: u64,
71}
72
73impl DeltaLakeCommon {
74 pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
75 let table = match Self::get_table_url(&self.location)? {
76 DeltaTableUrl::S3(s3_path) => {
77 let storage_options = self.build_delta_lake_config_for_aws().await?;
78 deltalake::aws::register_handlers(None);
79 deltalake::open_table_with_storage_options(s3_path.clone(), storage_options).await?
80 }
81 DeltaTableUrl::Local(local_path) => deltalake::open_table(local_path).await?,
82 DeltaTableUrl::Gcs(gcs_path) => {
83 let mut storage_options = HashMap::new();
84 storage_options.insert(
85 GCS_SERVICE_ACCOUNT.to_owned(),
86 self.gcs_service_account.clone().ok_or_else(|| {
87 SinkError::Config(anyhow!(
88 "gcs.service.account is required with Google Cloud Storage (GCS)"
89 ))
90 })?,
91 );
92 deltalake::gcp::register_handlers(None);
93 deltalake::open_table_with_storage_options(gcs_path.clone(), storage_options)
94 .await?
95 }
96 };
97 Ok(table)
98 }
99
100 fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
101 if path.starts_with("s3://") || path.starts_with("s3a://") {
102 Ok(DeltaTableUrl::S3(path.to_owned()))
103 } else if path.starts_with("gs://") {
104 Ok(DeltaTableUrl::Gcs(path.to_owned()))
105 } else if let Some(path) = path.strip_prefix("file://") {
106 Ok(DeltaTableUrl::Local(path.to_owned()))
107 } else {
108 Err(SinkError::DeltaLake(anyhow!(
109 "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
110 )))
111 }
112 }
113
114 async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
115 let mut storage_options = HashMap::new();
116 storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
117 storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
118 let sdk_config = self.aws_auth_props.build_config().await?;
119 let credentials = sdk_config
120 .credentials_provider()
121 .ok_or_else(|| {
122 SinkError::Config(anyhow!(
123 "s3.access.key and s3.secret.key is required with aws s3"
124 ))
125 })?
126 .as_ref()
127 .provide_credentials()
128 .await
129 .map_err(|e| SinkError::Config(e.into()))?;
130 let region = sdk_config.region();
131 let endpoint = sdk_config.endpoint_url();
132 storage_options.insert(
133 AWS_ACCESS_KEY_ID.to_owned(),
134 credentials.access_key_id().to_owned(),
135 );
136 storage_options.insert(
137 AWS_SECRET_ACCESS_KEY.to_owned(),
138 credentials.secret_access_key().to_owned(),
139 );
140 if endpoint.is_none() && region.is_none() {
141 return Err(SinkError::Config(anyhow!(
142 "s3.endpoint and s3.region need to be filled with at least one"
143 )));
144 }
145 storage_options.insert(
146 AWS_REGION.to_owned(),
147 region
148 .map(|r| r.as_ref().to_owned())
149 .clone()
150 .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
151 );
152 if let Some(s3_endpoint) = endpoint {
153 storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
154 }
155 Ok(storage_options)
156 }
157}
158
159enum DeltaTableUrl {
160 S3(String),
161 Local(String),
162 Gcs(String),
163}
164
165#[serde_as]
166#[derive(Clone, Debug, Deserialize, WithOptions)]
167pub struct DeltaLakeConfig {
168 #[serde(flatten)]
169 pub common: DeltaLakeCommon,
170
171 pub r#type: String,
172}
173
174impl DeltaLakeConfig {
175 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
176 let config = serde_json::from_value::<DeltaLakeConfig>(
177 serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
178 )
179 .map_err(|e| SinkError::Config(anyhow!(e)))?;
180 Ok(config)
181 }
182}
183
184#[derive(Debug)]
185pub struct DeltaLakeSink {
186 pub config: DeltaLakeConfig,
187 param: SinkParam,
188}
189
190impl DeltaLakeSink {
191 pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
192 Ok(Self { config, param })
193 }
194}
195
196fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
197 let result = match rw_data_type {
198 DataType::Boolean => {
199 matches!(
200 dl_data_type,
201 DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
202 )
203 }
204 DataType::Int16 => {
205 matches!(
206 dl_data_type,
207 DeltaLakeDataType::Primitive(PrimitiveType::Short)
208 )
209 }
210 DataType::Int32 => {
211 matches!(
212 dl_data_type,
213 DeltaLakeDataType::Primitive(PrimitiveType::Integer)
214 )
215 }
216 DataType::Int64 => {
217 matches!(
218 dl_data_type,
219 DeltaLakeDataType::Primitive(PrimitiveType::Long)
220 )
221 }
222 DataType::Float32 => {
223 matches!(
224 dl_data_type,
225 DeltaLakeDataType::Primitive(PrimitiveType::Float)
226 )
227 }
228 DataType::Float64 => {
229 matches!(
230 dl_data_type,
231 DeltaLakeDataType::Primitive(PrimitiveType::Double)
232 )
233 }
234 DataType::Decimal => {
235 matches!(
236 dl_data_type,
237 DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_, _))
238 )
239 }
240 DataType::Date => {
241 matches!(
242 dl_data_type,
243 DeltaLakeDataType::Primitive(PrimitiveType::Date)
244 )
245 }
246 DataType::Varchar => {
247 matches!(
248 dl_data_type,
249 DeltaLakeDataType::Primitive(PrimitiveType::String)
250 )
251 }
252 DataType::Timestamptz => {
253 matches!(
254 dl_data_type,
255 DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
256 )
257 }
258 DataType::Struct(rw_struct) => {
259 if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
260 let mut result = true;
261 for ((rw_name, rw_type), dl_field) in
262 rw_struct.iter().zip_eq_debug(dl_struct.fields())
263 {
264 result = check_field_type(rw_type, dl_field.data_type())?
265 && result
266 && rw_name.eq(dl_field.name());
267 }
268 result
269 } else {
270 false
271 }
272 }
273 DataType::List(rw_list) => {
274 if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
275 check_field_type(rw_list, dl_list.element_type())?
276 } else {
277 false
278 }
279 }
280 _ => {
281 return Err(SinkError::DeltaLake(anyhow!(
282 "deltalake cannot support type {:?}",
283 rw_data_type.to_owned()
284 )));
285 }
286 };
287 Ok(result)
288}
289
290impl Sink for DeltaLakeSink {
291 type Coordinator = DeltaLakeSinkCommitter;
292 type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
293
294 const SINK_NAME: &'static str = DELTALAKE_SINK;
295
296 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
297 let inner = DeltaLakeSinkWriter::new(
298 self.config.clone(),
299 self.param.schema().clone(),
300 self.param.downstream_pk.clone(),
301 )
302 .await?;
303
304 let commit_checkpoint_interval =
305 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
306 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
307 );
308
309 let writer = CoordinatedLogSinker::new(
310 &writer_param,
311 self.param.clone(),
312 inner,
313 commit_checkpoint_interval,
314 )
315 .await?;
316
317 Ok(writer)
318 }
319
320 async fn validate(&self) -> Result<()> {
321 if self.config.r#type != SINK_TYPE_APPEND_ONLY
322 && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
323 {
324 return Err(SinkError::Config(anyhow!(
325 "only append-only delta lake sink is supported",
326 )));
327 }
328 let table = self.config.common.create_deltalake_client().await?;
329 let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = table
330 .get_schema()?
331 .fields()
332 .map(|f| (f.name(), f.data_type()))
333 .collect();
334 if deltalake_fields.len() != self.param.schema().fields().len() {
335 return Err(SinkError::DeltaLake(anyhow!(
336 "Columns mismatch. RisingWave is {}, DeltaLake is {}",
337 self.param.schema().fields().len(),
338 deltalake_fields.len()
339 )));
340 }
341 for field in self.param.schema().fields() {
342 if !deltalake_fields.contains_key(&field.name) {
343 return Err(SinkError::DeltaLake(anyhow!(
344 "column {} not found in deltalake table",
345 field.name
346 )));
347 }
348 let deltalake_field_type: &&DeltaLakeDataType = deltalake_fields
349 .get(&field.name)
350 .ok_or_else(|| SinkError::DeltaLake(anyhow!("cannot find field type")))?;
351 if !check_field_type(&field.data_type, deltalake_field_type)? {
352 return Err(SinkError::DeltaLake(anyhow!(
353 "column {} type is not match, deltalake is {:?}, rw is{:?}",
354 field.name,
355 deltalake_field_type,
356 field.data_type
357 )));
358 }
359 }
360 if self.config.common.commit_checkpoint_interval == 0 {
361 return Err(SinkError::Config(anyhow!(
362 "`commit_checkpoint_interval` must be greater than 0"
363 )));
364 }
365 Ok(())
366 }
367
368 fn is_coordinated_sink(&self) -> bool {
369 true
370 }
371
372 async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
373 Ok(DeltaLakeSinkCommitter {
374 table: self.config.common.create_deltalake_client().await?,
375 })
376 }
377}
378
379impl TryFrom<SinkParam> for DeltaLakeSink {
380 type Error = SinkError;
381
382 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
383 let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
384 DeltaLakeSink::new(config, param)
385 }
386}
387
388pub struct DeltaLakeSinkWriter {
389 pub config: DeltaLakeConfig,
390 #[expect(dead_code)]
391 schema: Schema,
392 #[expect(dead_code)]
393 pk_indices: Vec<usize>,
394 writer: RecordBatchWriter,
395 dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
396 #[expect(dead_code)]
397 dl_table: DeltaTable,
398}
399
400impl DeltaLakeSinkWriter {
401 pub async fn new(
402 config: DeltaLakeConfig,
403 schema: Schema,
404 pk_indices: Vec<usize>,
405 ) -> Result<Self> {
406 let dl_table = config.common.create_deltalake_client().await?;
407 let writer = RecordBatchWriter::for_table(&dl_table)?;
408 let dl_schema: Arc<deltalake::arrow::datatypes::Schema> =
409 Arc::new(convert_schema(dl_table.get_schema()?)?);
410
411 Ok(Self {
412 config,
413 schema,
414 pk_indices,
415 writer,
416 dl_schema,
417 dl_table,
418 })
419 }
420
421 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
422 let a = DeltaLakeConvert
423 .to_record_batch(self.dl_schema.clone(), &chunk)
424 .context("convert record batch error")
425 .map_err(SinkError::DeltaLake)?;
426 self.writer.write(a).await?;
427 Ok(())
428 }
429}
430
431fn convert_schema(schema: &StructType) -> Result<deltalake::arrow::datatypes::Schema> {
432 let mut builder = deltalake::arrow::datatypes::SchemaBuilder::new();
433 for field in schema.fields() {
434 let dl_field = deltalake::arrow::datatypes::Field::new(
435 field.name(),
436 deltalake::arrow::datatypes::DataType::try_from(field.data_type())
437 .context("convert schema error")
438 .map_err(SinkError::DeltaLake)?,
439 field.is_nullable(),
440 );
441 builder.push(dl_field);
442 }
443 Ok(builder.finish())
444}
445
446#[async_trait]
447impl SinkWriter for DeltaLakeSinkWriter {
448 type CommitMetadata = Option<SinkMetadata>;
449
450 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
451 self.write(chunk).await
452 }
453
454 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
455 Ok(())
456 }
457
458 async fn abort(&mut self) -> Result<()> {
459 Ok(())
460 }
461
462 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
463 if !is_checkpoint {
464 return Ok(None);
465 }
466
467 let adds = self.writer.flush().await?;
468 Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
469 adds,
470 })?))
471 }
472}
473
474pub struct DeltaLakeSinkCommitter {
475 table: DeltaTable,
476}
477
478#[async_trait::async_trait]
479impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
480 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
481 tracing::info!("DeltaLake commit coordinator inited.");
482 Ok(None)
483 }
484
485 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
486 tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
487
488 let deltalake_write_result = metadata
489 .iter()
490 .map(DeltaLakeWriteResult::try_from)
491 .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
492 let write_adds: Vec<Action> = deltalake_write_result
493 .into_iter()
494 .flat_map(|v| v.adds.into_iter())
495 .map(Action::Add)
496 .collect();
497
498 if write_adds.is_empty() {
499 return Ok(());
500 }
501 let partition_cols = self.table.metadata()?.partition_columns.clone();
502 let partition_by = if !partition_cols.is_empty() {
503 Some(partition_cols)
504 } else {
505 None
506 };
507 let operation = DeltaOperation::Write {
508 mode: SaveMode::Append,
509 partition_by,
510 predicate: None,
511 };
512 let version = CommitBuilder::default()
513 .with_actions(write_adds)
514 .build(
515 Some(self.table.snapshot()?),
516 self.table.log_store().clone(),
517 operation,
518 )
519 .await?
520 .version();
521 self.table.update().await?;
522 tracing::info!(
523 "Succeeded to commit ti DeltaLake table in epoch {epoch} version {version}."
524 );
525 Ok(())
526 }
527}
528
529#[derive(Serialize, Deserialize)]
530struct DeltaLakeWriteResult {
531 adds: Vec<Add>,
532}
533
534impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
535 type Error = SinkError;
536
537 fn try_from(value: &'a DeltaLakeWriteResult) -> std::prelude::v1::Result<Self, Self::Error> {
538 let metadata =
539 serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
540 Ok(SinkMetadata {
541 metadata: Some(Serialized(SerializedMetadata { metadata })),
542 })
543 }
544}
545
546impl DeltaLakeWriteResult {
547 fn try_from(value: &SinkMetadata) -> Result<Self> {
548 if let Some(Serialized(v)) = &value.metadata {
549 let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
550 .context("Can't deserialize deltalake sink metadata")?;
551 Ok(DeltaLakeWriteResult { adds })
552 } else {
553 bail!("Can't create deltalake sink write result from empty data!")
554 }
555 }
556}
557
558#[cfg(all(test, not(madsim)))]
559mod test {
560 use deltalake::kernel::DataType as SchemaDataType;
561 use deltalake::operations::create::CreateBuilder;
562 use maplit::btreemap;
563 use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
564 use risingwave_common::catalog::{Field, Schema};
565 use risingwave_common::types::DataType;
566
567 use super::{DeltaLakeConfig, DeltaLakeSinkWriter};
568 use crate::sink::SinkCommitCoordinator;
569 use crate::sink::deltalake::DeltaLakeSinkCommitter;
570 use crate::sink::writer::SinkWriter;
571
572 #[tokio::test]
573 async fn test_deltalake() {
574 let dir = tempfile::tempdir().unwrap();
575 let path = dir.path().to_str().unwrap();
576 CreateBuilder::new()
577 .with_location(path)
578 .with_column(
579 "id",
580 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
581 false,
582 Default::default(),
583 )
584 .with_column(
585 "name",
586 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
587 false,
588 Default::default(),
589 )
590 .await
591 .unwrap();
592
593 let properties = btreemap! {
594 "connector".to_owned() => "deltalake".to_owned(),
595 "force_append_only".to_owned() => "true".to_owned(),
596 "type".to_owned() => "append-only".to_owned(),
597 "location".to_owned() => format!("file://{}", path),
598 };
599
600 let schema = Schema::new(vec![
601 Field {
602 data_type: DataType::Int32,
603 name: "id".into(),
604 },
605 Field {
606 data_type: DataType::Varchar,
607 name: "name".into(),
608 },
609 ]);
610
611 let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
612 let deltalake_table = deltalake_config
613 .common
614 .create_deltalake_client()
615 .await
616 .unwrap();
617
618 let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
619 .await
620 .unwrap();
621 let chunk = StreamChunk::new(
622 vec![Op::Insert, Op::Insert, Op::Insert],
623 vec![
624 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
625 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
626 ],
627 );
628 deltalake_writer.write(chunk).await.unwrap();
629 let mut committer = DeltaLakeSinkCommitter {
630 table: deltalake_table,
631 };
632 let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
633 committer.commit(1, vec![metadata]).await.unwrap();
634
635 let ctx = deltalake::datafusion::prelude::SessionContext::new();
644 let table = deltalake::open_table(path).await.unwrap();
645 ctx.register_table("demo", std::sync::Arc::new(table))
646 .unwrap();
647
648 let batches = ctx
649 .sql("SELECT * FROM demo")
650 .await
651 .unwrap()
652 .collect()
653 .await
654 .unwrap();
655 assert_eq!(3, batches.get(0).unwrap().column(0).len());
656 assert_eq!(3, batches.get(0).unwrap().column(1).len());
657 }
658}