1use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23 AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24 AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType, StructType};
27use deltalake::operations::transaction::CommitBuilder;
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use phf::{Set, phf_set};
31use risingwave_common::array::StreamChunk;
32use risingwave_common::array::arrow::DeltaLakeConvert;
33use risingwave_common::bail;
34use risingwave_common::catalog::Schema;
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqDebug;
37use risingwave_pb::connector_service::SinkMetadata;
38use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
39use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
40use sea_orm::DatabaseConnection;
41use serde_derive::{Deserialize, Serialize};
42use serde_with::{DisplayFromStr, serde_as};
43use with_options::WithOptions;
44
45use super::coordinate::CoordinatedLogSinker;
46use super::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
47use super::writer::SinkWriter;
48use super::{
49 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkCommitCoordinator,
50 SinkCommittedEpochSubscriber, SinkError, SinkParam, SinkWriterParam,
51};
52use crate::connector_common::AwsAuthProps;
53use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
54
55pub const DELTALAKE_SINK: &str = "deltalake";
56pub const DEFAULT_REGION: &str = "us-east-1";
57pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
58
59#[serde_as]
60#[derive(Deserialize, Debug, Clone, WithOptions)]
61pub struct DeltaLakeCommon {
62 #[serde(rename = "location")]
63 pub location: String,
64 #[serde(flatten)]
65 pub aws_auth_props: AwsAuthProps,
66
67 #[serde(rename = "gcs.service.account")]
68 pub gcs_service_account: Option<String>,
69 #[serde(default = "default_commit_checkpoint_interval")]
71 #[serde_as(as = "DisplayFromStr")]
72 pub commit_checkpoint_interval: u64,
73}
74
75impl EnforceSecret for DeltaLakeCommon {
76 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
77 "gcs.service.account",
78 };
79
80 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
81 AwsAuthProps::enforce_one(prop)?;
82 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
83 return Err(EnforceSecretError {
84 key: prop.to_owned(),
85 }
86 .into());
87 }
88
89 Ok(())
90 }
91}
92
93impl DeltaLakeCommon {
94 pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
95 let table = match Self::get_table_url(&self.location)? {
96 DeltaTableUrl::S3(s3_path) => {
97 let storage_options = self.build_delta_lake_config_for_aws().await?;
98 deltalake::aws::register_handlers(None);
99 deltalake::open_table_with_storage_options(s3_path.clone(), storage_options).await?
100 }
101 DeltaTableUrl::Local(local_path) => deltalake::open_table(local_path).await?,
102 DeltaTableUrl::Gcs(gcs_path) => {
103 let mut storage_options = HashMap::new();
104 storage_options.insert(
105 GCS_SERVICE_ACCOUNT.to_owned(),
106 self.gcs_service_account.clone().ok_or_else(|| {
107 SinkError::Config(anyhow!(
108 "gcs.service.account is required with Google Cloud Storage (GCS)"
109 ))
110 })?,
111 );
112 deltalake::gcp::register_handlers(None);
113 deltalake::open_table_with_storage_options(gcs_path.clone(), storage_options)
114 .await?
115 }
116 };
117 Ok(table)
118 }
119
120 fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
121 if path.starts_with("s3://") || path.starts_with("s3a://") {
122 Ok(DeltaTableUrl::S3(path.to_owned()))
123 } else if path.starts_with("gs://") {
124 Ok(DeltaTableUrl::Gcs(path.to_owned()))
125 } else if let Some(path) = path.strip_prefix("file://") {
126 Ok(DeltaTableUrl::Local(path.to_owned()))
127 } else {
128 Err(SinkError::DeltaLake(anyhow!(
129 "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
130 )))
131 }
132 }
133
134 async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
135 let mut storage_options = HashMap::new();
136 storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
137 storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
138 let sdk_config = self.aws_auth_props.build_config().await?;
139 let credentials = sdk_config
140 .credentials_provider()
141 .ok_or_else(|| {
142 SinkError::Config(anyhow!(
143 "s3.access.key and s3.secret.key is required with aws s3"
144 ))
145 })?
146 .as_ref()
147 .provide_credentials()
148 .await
149 .map_err(|e| SinkError::Config(e.into()))?;
150 let region = sdk_config.region();
151 let endpoint = sdk_config.endpoint_url();
152 storage_options.insert(
153 AWS_ACCESS_KEY_ID.to_owned(),
154 credentials.access_key_id().to_owned(),
155 );
156 storage_options.insert(
157 AWS_SECRET_ACCESS_KEY.to_owned(),
158 credentials.secret_access_key().to_owned(),
159 );
160 if endpoint.is_none() && region.is_none() {
161 return Err(SinkError::Config(anyhow!(
162 "s3.endpoint and s3.region need to be filled with at least one"
163 )));
164 }
165 storage_options.insert(
166 AWS_REGION.to_owned(),
167 region
168 .map(|r| r.as_ref().to_owned())
169 .clone()
170 .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
171 );
172 if let Some(s3_endpoint) = endpoint {
173 storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
174 }
175 Ok(storage_options)
176 }
177}
178
179enum DeltaTableUrl {
180 S3(String),
181 Local(String),
182 Gcs(String),
183}
184
185#[serde_as]
186#[derive(Clone, Debug, Deserialize, WithOptions)]
187pub struct DeltaLakeConfig {
188 #[serde(flatten)]
189 pub common: DeltaLakeCommon,
190
191 pub r#type: String,
192}
193
194impl EnforceSecret for DeltaLakeConfig {
195 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
196 DeltaLakeCommon::enforce_one(prop)
197 }
198}
199
200impl DeltaLakeConfig {
201 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
202 let config = serde_json::from_value::<DeltaLakeConfig>(
203 serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
204 )
205 .map_err(|e| SinkError::Config(anyhow!(e)))?;
206 Ok(config)
207 }
208}
209
210#[derive(Debug)]
211pub struct DeltaLakeSink {
212 pub config: DeltaLakeConfig,
213 param: SinkParam,
214}
215
216impl EnforceSecret for DeltaLakeSink {
217 fn enforce_secret<'a>(
218 prop_iter: impl Iterator<Item = &'a str>,
219 ) -> crate::error::ConnectorResult<()> {
220 for prop in prop_iter {
221 DeltaLakeCommon::enforce_one(prop)?;
222 }
223 Ok(())
224 }
225}
226
227impl DeltaLakeSink {
228 pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
229 Ok(Self { config, param })
230 }
231}
232
233fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
234 let result = match rw_data_type {
235 DataType::Boolean => {
236 matches!(
237 dl_data_type,
238 DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
239 )
240 }
241 DataType::Int16 => {
242 matches!(
243 dl_data_type,
244 DeltaLakeDataType::Primitive(PrimitiveType::Short)
245 )
246 }
247 DataType::Int32 => {
248 matches!(
249 dl_data_type,
250 DeltaLakeDataType::Primitive(PrimitiveType::Integer)
251 )
252 }
253 DataType::Int64 => {
254 matches!(
255 dl_data_type,
256 DeltaLakeDataType::Primitive(PrimitiveType::Long)
257 )
258 }
259 DataType::Float32 => {
260 matches!(
261 dl_data_type,
262 DeltaLakeDataType::Primitive(PrimitiveType::Float)
263 )
264 }
265 DataType::Float64 => {
266 matches!(
267 dl_data_type,
268 DeltaLakeDataType::Primitive(PrimitiveType::Double)
269 )
270 }
271 DataType::Decimal => {
272 matches!(
273 dl_data_type,
274 DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_, _))
275 )
276 }
277 DataType::Date => {
278 matches!(
279 dl_data_type,
280 DeltaLakeDataType::Primitive(PrimitiveType::Date)
281 )
282 }
283 DataType::Varchar => {
284 matches!(
285 dl_data_type,
286 DeltaLakeDataType::Primitive(PrimitiveType::String)
287 )
288 }
289 DataType::Timestamptz => {
290 matches!(
291 dl_data_type,
292 DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
293 )
294 }
295 DataType::Struct(rw_struct) => {
296 if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
297 let mut result = true;
298 for ((rw_name, rw_type), dl_field) in
299 rw_struct.iter().zip_eq_debug(dl_struct.fields())
300 {
301 result = check_field_type(rw_type, dl_field.data_type())?
302 && result
303 && rw_name.eq(dl_field.name());
304 }
305 result
306 } else {
307 false
308 }
309 }
310 DataType::List(rw_list) => {
311 if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
312 check_field_type(rw_list, dl_list.element_type())?
313 } else {
314 false
315 }
316 }
317 _ => {
318 return Err(SinkError::DeltaLake(anyhow!(
319 "deltalake cannot support type {:?}",
320 rw_data_type.to_owned()
321 )));
322 }
323 };
324 Ok(result)
325}
326
327impl Sink for DeltaLakeSink {
328 type Coordinator = DeltaLakeSinkCommitter;
329 type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
330
331 const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &["commit_checkpoint_interval"];
332 const SINK_NAME: &'static str = DELTALAKE_SINK;
333
334 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
335 let inner = DeltaLakeSinkWriter::new(
336 self.config.clone(),
337 self.param.schema().clone(),
338 self.param.downstream_pk.clone(),
339 )
340 .await?;
341
342 let commit_checkpoint_interval =
343 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
344 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
345 );
346
347 let writer = CoordinatedLogSinker::new(
348 &writer_param,
349 self.param.clone(),
350 inner,
351 commit_checkpoint_interval,
352 )
353 .await?;
354
355 Ok(writer)
356 }
357
358 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
359 DeltaLakeConfig::from_btreemap(config.clone())?;
360 Ok(())
361 }
362
363 async fn validate(&self) -> Result<()> {
364 if self.config.r#type != SINK_TYPE_APPEND_ONLY
365 && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
366 {
367 return Err(SinkError::Config(anyhow!(
368 "only append-only delta lake sink is supported",
369 )));
370 }
371 let table = self.config.common.create_deltalake_client().await?;
372 let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = table
373 .get_schema()?
374 .fields()
375 .map(|f| (f.name(), f.data_type()))
376 .collect();
377 if deltalake_fields.len() != self.param.schema().fields().len() {
378 return Err(SinkError::DeltaLake(anyhow!(
379 "Columns mismatch. RisingWave is {}, DeltaLake is {}",
380 self.param.schema().fields().len(),
381 deltalake_fields.len()
382 )));
383 }
384 for field in self.param.schema().fields() {
385 if !deltalake_fields.contains_key(&field.name) {
386 return Err(SinkError::DeltaLake(anyhow!(
387 "column {} not found in deltalake table",
388 field.name
389 )));
390 }
391 let deltalake_field_type: &&DeltaLakeDataType = deltalake_fields
392 .get(&field.name)
393 .ok_or_else(|| SinkError::DeltaLake(anyhow!("cannot find field type")))?;
394 if !check_field_type(&field.data_type, deltalake_field_type)? {
395 return Err(SinkError::DeltaLake(anyhow!(
396 "column {} type is not match, deltalake is {:?}, rw is{:?}",
397 field.name,
398 deltalake_field_type,
399 field.data_type
400 )));
401 }
402 }
403 if self.config.common.commit_checkpoint_interval == 0 {
404 return Err(SinkError::Config(anyhow!(
405 "`commit_checkpoint_interval` must be greater than 0"
406 )));
407 }
408 Ok(())
409 }
410
411 fn is_coordinated_sink(&self) -> bool {
412 true
413 }
414
415 async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
416 Ok(DeltaLakeSinkCommitter {
417 table: self.config.common.create_deltalake_client().await?,
418 })
419 }
420}
421
422impl TryFrom<SinkParam> for DeltaLakeSink {
423 type Error = SinkError;
424
425 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
426 let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
427 DeltaLakeSink::new(config, param)
428 }
429}
430
431pub struct DeltaLakeSinkWriter {
432 pub config: DeltaLakeConfig,
433 #[expect(dead_code)]
434 schema: Schema,
435 #[expect(dead_code)]
436 pk_indices: Vec<usize>,
437 writer: RecordBatchWriter,
438 dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
439 #[expect(dead_code)]
440 dl_table: DeltaTable,
441}
442
443impl DeltaLakeSinkWriter {
444 pub async fn new(
445 config: DeltaLakeConfig,
446 schema: Schema,
447 pk_indices: Vec<usize>,
448 ) -> Result<Self> {
449 let dl_table = config.common.create_deltalake_client().await?;
450 let writer = RecordBatchWriter::for_table(&dl_table)?;
451 let dl_schema: Arc<deltalake::arrow::datatypes::Schema> =
452 Arc::new(convert_schema(dl_table.get_schema()?)?);
453
454 Ok(Self {
455 config,
456 schema,
457 pk_indices,
458 writer,
459 dl_schema,
460 dl_table,
461 })
462 }
463
464 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
465 let a = DeltaLakeConvert
466 .to_record_batch(self.dl_schema.clone(), &chunk)
467 .context("convert record batch error")
468 .map_err(SinkError::DeltaLake)?;
469 self.writer.write(a).await?;
470 Ok(())
471 }
472}
473
474fn convert_schema(schema: &StructType) -> Result<deltalake::arrow::datatypes::Schema> {
475 let mut builder = deltalake::arrow::datatypes::SchemaBuilder::new();
476 for field in schema.fields() {
477 let dl_field = deltalake::arrow::datatypes::Field::new(
478 field.name(),
479 deltalake::arrow::datatypes::DataType::try_from(field.data_type())
480 .context("convert schema error")
481 .map_err(SinkError::DeltaLake)?,
482 field.is_nullable(),
483 );
484 builder.push(dl_field);
485 }
486 Ok(builder.finish())
487}
488
489#[async_trait]
490impl SinkWriter for DeltaLakeSinkWriter {
491 type CommitMetadata = Option<SinkMetadata>;
492
493 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
494 self.write(chunk).await
495 }
496
497 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
498 Ok(())
499 }
500
501 async fn abort(&mut self) -> Result<()> {
502 Ok(())
503 }
504
505 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
506 if !is_checkpoint {
507 return Ok(None);
508 }
509
510 let adds = self.writer.flush().await?;
511 Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
512 adds,
513 })?))
514 }
515}
516
517pub struct DeltaLakeSinkCommitter {
518 table: DeltaTable,
519}
520
521#[async_trait::async_trait]
522impl SinkCommitCoordinator for DeltaLakeSinkCommitter {
523 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
524 tracing::info!("DeltaLake commit coordinator inited.");
525 Ok(None)
526 }
527
528 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
529 tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
530
531 let deltalake_write_result = metadata
532 .iter()
533 .map(DeltaLakeWriteResult::try_from)
534 .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
535 let write_adds: Vec<Action> = deltalake_write_result
536 .into_iter()
537 .flat_map(|v| v.adds.into_iter())
538 .map(Action::Add)
539 .collect();
540
541 if write_adds.is_empty() {
542 return Ok(());
543 }
544 let partition_cols = self.table.metadata()?.partition_columns.clone();
545 let partition_by = if !partition_cols.is_empty() {
546 Some(partition_cols)
547 } else {
548 None
549 };
550 let operation = DeltaOperation::Write {
551 mode: SaveMode::Append,
552 partition_by,
553 predicate: None,
554 };
555 let version = CommitBuilder::default()
556 .with_actions(write_adds)
557 .build(
558 Some(self.table.snapshot()?),
559 self.table.log_store().clone(),
560 operation,
561 )
562 .await?
563 .version();
564 self.table.update().await?;
565 tracing::info!(
566 "Succeeded to commit ti DeltaLake table in epoch {epoch} version {version}."
567 );
568 Ok(())
569 }
570}
571
572#[derive(Serialize, Deserialize)]
573struct DeltaLakeWriteResult {
574 adds: Vec<Add>,
575}
576
577impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
578 type Error = SinkError;
579
580 fn try_from(value: &'a DeltaLakeWriteResult) -> std::prelude::v1::Result<Self, Self::Error> {
581 let metadata =
582 serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
583 Ok(SinkMetadata {
584 metadata: Some(Serialized(SerializedMetadata { metadata })),
585 })
586 }
587}
588
589impl DeltaLakeWriteResult {
590 fn try_from(value: &SinkMetadata) -> Result<Self> {
591 if let Some(Serialized(v)) = &value.metadata {
592 let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
593 .context("Can't deserialize deltalake sink metadata")?;
594 Ok(DeltaLakeWriteResult { adds })
595 } else {
596 bail!("Can't create deltalake sink write result from empty data!")
597 }
598 }
599}
600
601#[cfg(all(test, not(madsim)))]
602mod test {
603 use deltalake::kernel::DataType as SchemaDataType;
604 use deltalake::operations::create::CreateBuilder;
605 use maplit::btreemap;
606 use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
607 use risingwave_common::catalog::{Field, Schema};
608 use risingwave_common::types::DataType;
609
610 use super::{DeltaLakeConfig, DeltaLakeSinkWriter};
611 use crate::sink::SinkCommitCoordinator;
612 use crate::sink::deltalake::DeltaLakeSinkCommitter;
613 use crate::sink::writer::SinkWriter;
614
615 #[tokio::test]
616 async fn test_deltalake() {
617 let dir = tempfile::tempdir().unwrap();
618 let path = dir.path().to_str().unwrap();
619 CreateBuilder::new()
620 .with_location(path)
621 .with_column(
622 "id",
623 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
624 false,
625 Default::default(),
626 )
627 .with_column(
628 "name",
629 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
630 false,
631 Default::default(),
632 )
633 .await
634 .unwrap();
635
636 let properties = btreemap! {
637 "connector".to_owned() => "deltalake".to_owned(),
638 "force_append_only".to_owned() => "true".to_owned(),
639 "type".to_owned() => "append-only".to_owned(),
640 "location".to_owned() => format!("file://{}", path),
641 };
642
643 let schema = Schema::new(vec![
644 Field {
645 data_type: DataType::Int32,
646 name: "id".into(),
647 },
648 Field {
649 data_type: DataType::Varchar,
650 name: "name".into(),
651 },
652 ]);
653
654 let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
655 let deltalake_table = deltalake_config
656 .common
657 .create_deltalake_client()
658 .await
659 .unwrap();
660
661 let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
662 .await
663 .unwrap();
664 let chunk = StreamChunk::new(
665 vec![Op::Insert, Op::Insert, Op::Insert],
666 vec![
667 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
668 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
669 ],
670 );
671 deltalake_writer.write(chunk).await.unwrap();
672 let mut committer = DeltaLakeSinkCommitter {
673 table: deltalake_table,
674 };
675 let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
676 committer.commit(1, vec![metadata]).await.unwrap();
677
678 let ctx = deltalake::datafusion::prelude::SessionContext::new();
687 let table = deltalake::open_table(path).await.unwrap();
688 ctx.register_table("demo", std::sync::Arc::new(table))
689 .unwrap();
690
691 let batches = ctx
692 .sql("SELECT * FROM demo")
693 .await
694 .unwrap()
695 .collect()
696 .await
697 .unwrap();
698 assert_eq!(3, batches.get(0).unwrap().column(0).len());
699 assert_eq!(3, batches.get(0).unwrap().column(1).len());
700 }
701}