1use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23 AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24 AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::transaction::CommitBuilder;
27use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType};
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use phf::{Set, phf_set};
31use risingwave_common::array::StreamChunk;
32use risingwave_common::array::arrow::DeltaLakeConvert;
33use risingwave_common::bail;
34use risingwave_common::catalog::Schema;
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqDebug;
37use risingwave_pb::connector_service::SinkMetadata;
38use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
39use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
40use sea_orm::DatabaseConnection;
41use serde_derive::{Deserialize, Serialize};
42use serde_with::{DisplayFromStr, serde_as};
43use tokio::sync::mpsc::UnboundedSender;
44use with_options::WithOptions;
45
46use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
47use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
48use crate::sink::coordinate::CoordinatedLogSinker;
49use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
50use crate::sink::writer::SinkWriter;
51use crate::sink::{
52 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkCommitCoordinator,
53 SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterParam,
54};
55
56pub const DEFAULT_REGION: &str = "us-east-1";
57pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
58
59#[serde_as]
60#[derive(Deserialize, Debug, Clone, WithOptions)]
61pub struct DeltaLakeCommon {
62 #[serde(rename = "location")]
63 pub location: String,
64 #[serde(flatten)]
65 pub aws_auth_props: AwsAuthProps,
66
67 #[serde(rename = "gcs.service.account")]
68 pub gcs_service_account: Option<String>,
69 #[serde(default = "default_commit_checkpoint_interval")]
71 #[serde_as(as = "DisplayFromStr")]
72 pub commit_checkpoint_interval: u64,
73}
74
75impl EnforceSecret for DeltaLakeCommon {
76 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
77 "gcs.service.account",
78 };
79
80 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
81 AwsAuthProps::enforce_one(prop)?;
82 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
83 return Err(EnforceSecretError {
84 key: prop.to_owned(),
85 }
86 .into());
87 }
88
89 Ok(())
90 }
91}
92
93impl DeltaLakeCommon {
94 pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
95 let table = match Self::get_table_url(&self.location)? {
96 DeltaTableUrl::S3(s3_path) => {
97 let storage_options = self.build_delta_lake_config_for_aws().await?;
98 deltalake::aws::register_handlers(None);
99 deltalake::open_table_with_storage_options(&s3_path, storage_options).await?
100 }
101 DeltaTableUrl::Local(local_path) => deltalake::open_table(local_path).await?,
102 DeltaTableUrl::Gcs(gcs_path) => {
103 let mut storage_options = HashMap::new();
104 storage_options.insert(
105 GCS_SERVICE_ACCOUNT.to_owned(),
106 self.gcs_service_account.clone().ok_or_else(|| {
107 SinkError::Config(anyhow!(
108 "gcs.service.account is required with Google Cloud Storage (GCS)"
109 ))
110 })?,
111 );
112 deltalake::gcp::register_handlers(None);
113 deltalake::open_table_with_storage_options(gcs_path.clone(), storage_options)
114 .await?
115 }
116 };
117 Ok(table)
118 }
119
120 fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
121 if path.starts_with("s3://") || path.starts_with("s3a://") {
122 Ok(DeltaTableUrl::S3(path.to_owned()))
123 } else if path.starts_with("gs://") {
124 Ok(DeltaTableUrl::Gcs(path.to_owned()))
125 } else if let Some(path) = path.strip_prefix("file://") {
126 Ok(DeltaTableUrl::Local(path.to_owned()))
127 } else {
128 Err(SinkError::DeltaLake(anyhow!(
129 "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
130 )))
131 }
132 }
133
134 async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
135 let mut storage_options = HashMap::new();
136 storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
137 storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
138 let sdk_config = self.aws_auth_props.build_config().await?;
139 let credentials = sdk_config
140 .credentials_provider()
141 .ok_or_else(|| {
142 SinkError::Config(anyhow!(
143 "s3.access.key and s3.secret.key is required with aws s3"
144 ))
145 })?
146 .as_ref()
147 .provide_credentials()
148 .await
149 .map_err(|e| SinkError::Config(e.into()))?;
150 let region = sdk_config.region();
151 let endpoint = sdk_config.endpoint_url();
152 storage_options.insert(
153 AWS_ACCESS_KEY_ID.to_owned(),
154 credentials.access_key_id().to_owned(),
155 );
156 storage_options.insert(
157 AWS_SECRET_ACCESS_KEY.to_owned(),
158 credentials.secret_access_key().to_owned(),
159 );
160 if endpoint.is_none() && region.is_none() {
161 return Err(SinkError::Config(anyhow!(
162 "s3.endpoint and s3.region need to be filled with at least one"
163 )));
164 }
165 storage_options.insert(
166 AWS_REGION.to_owned(),
167 region
168 .map(|r| r.as_ref().to_owned())
169 .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
170 );
171 if let Some(s3_endpoint) = endpoint {
172 storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
173 }
174 Ok(storage_options)
175 }
176}
177
178enum DeltaTableUrl {
179 S3(String),
180 Local(String),
181 Gcs(String),
182}
183
184#[serde_as]
185#[derive(Clone, Debug, Deserialize, WithOptions)]
186pub struct DeltaLakeConfig {
187 #[serde(flatten)]
188 pub common: DeltaLakeCommon,
189
190 pub r#type: String,
191}
192
193impl EnforceSecret for DeltaLakeConfig {
194 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
195 DeltaLakeCommon::enforce_one(prop)
196 }
197}
198
199impl DeltaLakeConfig {
200 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
201 let config = serde_json::from_value::<DeltaLakeConfig>(
202 serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
203 )
204 .map_err(|e| SinkError::Config(anyhow!(e)))?;
205 Ok(config)
206 }
207}
208
209#[derive(Debug)]
210pub struct DeltaLakeSink {
211 pub config: DeltaLakeConfig,
212 param: SinkParam,
213}
214
215impl EnforceSecret for DeltaLakeSink {
216 fn enforce_secret<'a>(
217 prop_iter: impl Iterator<Item = &'a str>,
218 ) -> crate::error::ConnectorResult<()> {
219 for prop in prop_iter {
220 DeltaLakeCommon::enforce_one(prop)?;
221 }
222 Ok(())
223 }
224}
225
226impl DeltaLakeSink {
227 pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
228 Ok(Self { config, param })
229 }
230}
231
232fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
233 let result = match rw_data_type {
234 DataType::Boolean => {
235 matches!(
236 dl_data_type,
237 DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
238 )
239 }
240 DataType::Int16 => {
241 matches!(
242 dl_data_type,
243 DeltaLakeDataType::Primitive(PrimitiveType::Short)
244 )
245 }
246 DataType::Int32 => {
247 matches!(
248 dl_data_type,
249 DeltaLakeDataType::Primitive(PrimitiveType::Integer)
250 )
251 }
252 DataType::Int64 => {
253 matches!(
254 dl_data_type,
255 DeltaLakeDataType::Primitive(PrimitiveType::Long)
256 )
257 }
258 DataType::Float32 => {
259 matches!(
260 dl_data_type,
261 DeltaLakeDataType::Primitive(PrimitiveType::Float)
262 )
263 }
264 DataType::Float64 => {
265 matches!(
266 dl_data_type,
267 DeltaLakeDataType::Primitive(PrimitiveType::Double)
268 )
269 }
270 DataType::Decimal => {
271 matches!(
272 dl_data_type,
273 DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
274 )
275 }
276 DataType::Date => {
277 matches!(
278 dl_data_type,
279 DeltaLakeDataType::Primitive(PrimitiveType::Date)
280 )
281 }
282 DataType::Varchar => {
283 matches!(
284 dl_data_type,
285 DeltaLakeDataType::Primitive(PrimitiveType::String)
286 )
287 }
288 DataType::Timestamptz => {
289 matches!(
290 dl_data_type,
291 DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
292 )
293 }
294 DataType::Struct(rw_struct) => {
295 if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
296 let mut result = true;
297 for ((rw_name, rw_type), dl_field) in
298 rw_struct.iter().zip_eq_debug(dl_struct.fields())
299 {
300 result = check_field_type(rw_type, dl_field.data_type())?
301 && result
302 && rw_name.eq(dl_field.name());
303 }
304 result
305 } else {
306 false
307 }
308 }
309 DataType::List(rw_list) => {
310 if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
311 check_field_type(rw_list, dl_list.element_type())?
312 } else {
313 false
314 }
315 }
316 _ => {
317 return Err(SinkError::DeltaLake(anyhow!(
318 "deltalake cannot support type {:?}",
319 rw_data_type.to_owned()
320 )));
321 }
322 };
323 Ok(result)
324}
325
326impl Sink for DeltaLakeSink {
327 type Coordinator = DeltaLakeSinkCommitter;
328 type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
329
330 const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &["commit_checkpoint_interval"];
331 const SINK_NAME: &'static str = super::DELTALAKE_SINK;
332
333 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
334 let inner = DeltaLakeSinkWriter::new(
335 self.config.clone(),
336 self.param.schema().clone(),
337 self.param.downstream_pk.clone(),
338 )
339 .await?;
340
341 let commit_checkpoint_interval =
342 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
343 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
344 );
345
346 let writer = CoordinatedLogSinker::new(
347 &writer_param,
348 self.param.clone(),
349 inner,
350 commit_checkpoint_interval,
351 )
352 .await?;
353
354 Ok(writer)
355 }
356
357 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
358 DeltaLakeConfig::from_btreemap(config.clone())?;
359 Ok(())
360 }
361
362 async fn validate(&self) -> Result<()> {
363 if self.config.r#type != SINK_TYPE_APPEND_ONLY
364 && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
365 {
366 return Err(SinkError::Config(anyhow!(
367 "only append-only delta lake sink is supported",
368 )));
369 }
370 let table = self.config.common.create_deltalake_client().await?;
371 let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = table
372 .get_schema()?
373 .fields()
374 .map(|f| (f.name(), f.data_type()))
375 .collect();
376 if deltalake_fields.len() != self.param.schema().fields().len() {
377 return Err(SinkError::DeltaLake(anyhow!(
378 "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
379 self.param.schema().fields().len(),
380 deltalake_fields.len()
381 )));
382 }
383 for field in self.param.schema().fields() {
384 if !deltalake_fields.contains_key(&field.name) {
385 return Err(SinkError::DeltaLake(anyhow!(
386 "column {} not found in deltalake table",
387 field.name
388 )));
389 }
390 let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
391 SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
392 })?;
393 if !check_field_type(&field.data_type, deltalake_field_type)? {
394 return Err(SinkError::DeltaLake(anyhow!(
395 "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
396 field.name,
397 deltalake_field_type,
398 field.data_type
399 )));
400 }
401 }
402 if self.config.common.commit_checkpoint_interval == 0 {
403 return Err(SinkError::Config(anyhow!(
404 "`commit_checkpoint_interval` must be greater than 0"
405 )));
406 }
407 Ok(())
408 }
409
410 fn is_coordinated_sink(&self) -> bool {
411 true
412 }
413
414 async fn new_coordinator(
415 &self,
416 _db: DatabaseConnection,
417 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
418 ) -> Result<Self::Coordinator> {
419 Ok(DeltaLakeSinkCommitter {
420 table: self.config.common.create_deltalake_client().await?,
421 })
422 }
423}
424
425impl TryFrom<SinkParam> for DeltaLakeSink {
426 type Error = SinkError;
427
428 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
429 let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
430 DeltaLakeSink::new(config, param)
431 }
432}
433
434pub struct DeltaLakeSinkWriter {
435 pub config: DeltaLakeConfig,
436 #[expect(dead_code)]
437 schema: Schema,
438 #[expect(dead_code)]
439 pk_indices: Vec<usize>,
440 writer: RecordBatchWriter,
441 dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
442 #[expect(dead_code)]
443 dl_table: DeltaTable,
444}
445
446impl DeltaLakeSinkWriter {
447 pub async fn new(
448 config: DeltaLakeConfig,
449 schema: Schema,
450 pk_indices: Vec<usize>,
451 ) -> Result<Self> {
452 let dl_table = config.common.create_deltalake_client().await?;
453 let writer = RecordBatchWriter::for_table(&dl_table)?;
454 let dl_schema: Arc<deltalake::arrow::datatypes::Schema> =
455 Arc::new(convert_schema(dl_table.get_schema()?)?);
456
457 Ok(Self {
458 config,
459 schema,
460 pk_indices,
461 writer,
462 dl_schema,
463 dl_table,
464 })
465 }
466
467 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
468 let a = DeltaLakeConvert
469 .to_record_batch(self.dl_schema.clone(), &chunk)
470 .context("convert record batch error")
471 .map_err(SinkError::DeltaLake)?;
472 self.writer.write(a).await?;
473 Ok(())
474 }
475}
476
477fn convert_schema(schema: &StructType) -> Result<deltalake::arrow::datatypes::Schema> {
478 let mut builder = deltalake::arrow::datatypes::SchemaBuilder::new();
479 for field in schema.fields() {
480 let arrow_field_type = deltalake::arrow::datatypes::DataType::try_from(field.data_type())
481 .with_context(|| {
482 format!(
483 "Failed to convert DeltaLake data type {:?} to Arrow data type for field '{}'",
484 field.data_type(),
485 field.name()
486 )
487 })
488 .map_err(SinkError::DeltaLake)?;
489 let dl_field = deltalake::arrow::datatypes::Field::new(
490 field.name(),
491 arrow_field_type,
492 field.is_nullable(),
493 );
494 builder.push(dl_field);
495 }
496 Ok(builder.finish())
497}
498
499#[async_trait]
500impl SinkWriter for DeltaLakeSinkWriter {
501 type CommitMetadata = Option<SinkMetadata>;
502
503 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
504 self.write(chunk).await
505 }
506
507 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
508 Ok(())
509 }
510
511 async fn abort(&mut self) -> Result<()> {
512 Ok(())
513 }
514
515 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
516 if !is_checkpoint {
517 return Ok(None);
518 }
519
520 let adds = self.writer.flush().await?;
521 Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
522 adds,
523 })?))
524 }
525}
526
527pub struct DeltaLakeSinkCommitter {
528 table: DeltaTable,
529}
530
531#[async_trait::async_trait]
532impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
533 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
534 tracing::info!("DeltaLake commit coordinator inited.");
535 Ok(None)
536 }
537
538 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
539 tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
540
541 let deltalake_write_result = metadata
542 .iter()
543 .map(DeltaLakeWriteResult::try_from)
544 .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
545 let write_adds: Vec<Action> = deltalake_write_result
546 .into_iter()
547 .flat_map(|v| v.adds.into_iter())
548 .map(Action::Add)
549 .collect();
550
551 if write_adds.is_empty() {
552 return Ok(());
553 }
554 let partition_cols = self.table.metadata()?.partition_columns.clone();
555 let partition_by = if !partition_cols.is_empty() {
556 Some(partition_cols)
557 } else {
558 None
559 };
560 let operation = DeltaOperation::Write {
561 mode: SaveMode::Append,
562 partition_by,
563 predicate: None,
564 };
565 let version = CommitBuilder::default()
566 .with_actions(write_adds)
567 .build(
568 Some(self.table.snapshot()?),
569 self.table.log_store().clone(),
570 operation,
571 )
572 .await?
573 .version();
574 self.table.update().await?;
575 tracing::info!(
576 "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
577 );
578 Ok(())
579 }
580}
581
582#[derive(Serialize, Deserialize)]
583struct DeltaLakeWriteResult {
584 adds: Vec<Add>,
585}
586
587impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
588 type Error = SinkError;
589
590 fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
591 let metadata =
592 serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
593 Ok(SinkMetadata {
594 metadata: Some(Serialized(SerializedMetadata { metadata })),
595 })
596 }
597}
598
599impl DeltaLakeWriteResult {
600 fn try_from(value: &SinkMetadata) -> Result<Self> {
601 if let Some(Serialized(v)) = &value.metadata {
602 let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
603 .context("Can't deserialize deltalake sink metadata")?;
604 Ok(DeltaLakeWriteResult { adds })
605 } else {
606 bail!("Can't create deltalake sink write result from empty data!")
607 }
608 }
609}
610
611#[cfg(all(test, not(madsim)))]
612mod test {
613 use deltalake::kernel::DataType as SchemaDataType;
614 use deltalake::operations::create::CreateBuilder;
615 use maplit::btreemap;
616 use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
617 use risingwave_common::catalog::{Field, Schema};
618 use risingwave_common::types::DataType;
619
620 use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
621 use crate::sink::SinkCommitCoordinator;
622 use crate::sink::writer::SinkWriter;
623
624 #[tokio::test]
625 async fn test_deltalake() {
626 let dir = tempfile::tempdir().unwrap();
627 let path = dir.path().to_str().unwrap();
628 CreateBuilder::new()
629 .with_location(path)
630 .with_column(
631 "id",
632 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
633 false,
634 Default::default(),
635 )
636 .with_column(
637 "name",
638 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
639 false,
640 Default::default(),
641 )
642 .await
643 .unwrap();
644
645 let properties = btreemap! {
646 "connector".to_owned() => "deltalake".to_owned(),
647 "force_append_only".to_owned() => "true".to_owned(),
648 "type".to_owned() => "append-only".to_owned(),
649 "location".to_owned() => format!("file://{}", path),
650 };
651
652 let schema = Schema::new(vec![
653 Field {
654 data_type: DataType::Int32,
655 name: "id".into(),
656 },
657 Field {
658 data_type: DataType::Varchar,
659 name: "name".into(),
660 },
661 ]);
662
663 let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
664 let deltalake_table = deltalake_config
665 .common
666 .create_deltalake_client()
667 .await
668 .unwrap();
669
670 let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
671 .await
672 .unwrap();
673 let chunk = StreamChunk::new(
674 vec![Op::Insert, Op::Insert, Op::Insert],
675 vec![
676 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
677 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
678 ],
679 );
680 deltalake_writer.write(chunk).await.unwrap();
681 let mut committer = DeltaLakeSinkCommitter {
682 table: deltalake_table,
683 };
684 let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
685 committer.commit(1, vec![metadata]).await.unwrap();
686
687 let ctx = deltalake::datafusion::prelude::SessionContext::new();
696 let table = deltalake::open_table(path).await.unwrap();
697 ctx.register_table("demo", std::sync::Arc::new(table))
698 .unwrap();
699
700 let batches = ctx
701 .sql("SELECT * FROM demo")
702 .await
703 .unwrap()
704 .collect()
705 .await
706 .unwrap();
707 assert_eq!(3, batches.get(0).unwrap().column(0).len());
708 assert_eq!(3, batches.get(0).unwrap().column(1).len());
709 }
710}