1use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23 AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24 AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::datafusion::datasource::TableProvider;
27use deltalake::kernel::transaction::CommitBuilder;
28use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType};
29use deltalake::protocol::{DeltaOperation, SaveMode};
30use deltalake::writer::{DeltaWriter, RecordBatchWriter};
31use url::Url;
32use phf::{Set, phf_set};
33use risingwave_common::array::StreamChunk;
34use risingwave_common::array::arrow::DeltaLakeConvert;
35use risingwave_common::bail;
36use risingwave_common::catalog::{Schema};
37use risingwave_common::types::DataType;
38use risingwave_common::util::iter_util::ZipEqDebug;
39use risingwave_pb::connector_service::SinkMetadata;
40use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
41use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
42use serde::{Deserialize, Serialize};
43use serde_with::{DisplayFromStr, serde_as};
44use tokio::sync::mpsc::UnboundedSender;
45use with_options::WithOptions;
46
47use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
48use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
49use crate::sink::coordinate::CoordinatedLogSinker;
50use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
51use crate::sink::writer::SinkWriter;
52use crate::sink::{
53 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
54 SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
55 SinkWriterParam,
56};
57
58pub const DEFAULT_REGION: &str = "us-east-1";
59pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
60
61pub const DELTALAKE_SINK: &str = "deltalake";
62
63#[serde_as]
64#[derive(Deserialize, Debug, Clone, WithOptions)]
65pub struct DeltaLakeCommon {
66 #[serde(rename = "location")]
67 pub location: String,
68 #[serde(flatten)]
69 pub aws_auth_props: AwsAuthProps,
70
71 #[serde(rename = "gcs.service.account")]
72 pub gcs_service_account: Option<String>,
73 #[serde(default = "default_commit_checkpoint_interval")]
75 #[serde_as(as = "DisplayFromStr")]
76 #[with_option(allow_alter_on_fly)]
77 pub commit_checkpoint_interval: u64,
78}
79
80impl EnforceSecret for DeltaLakeCommon {
81 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
82 "gcs.service.account",
83 };
84
85 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
86 AwsAuthProps::enforce_one(prop)?;
87 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
88 return Err(EnforceSecretError {
89 key: prop.to_owned(),
90 }
91 .into());
92 }
93
94 Ok(())
95 }
96}
97
98impl DeltaLakeCommon {
99 pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
100 let table = match Self::get_table_url(&self.location)? {
101 DeltaTableUrl::S3(s3_path) => {
102 let storage_options = self.build_delta_lake_config_for_aws().await?;
103 deltalake::aws::register_handlers(None);
104 let url = Url::parse(&s3_path)
105 .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
106 deltalake::open_table_with_storage_options(url, storage_options).await?
107 }
108 DeltaTableUrl::Local(local_path) => {
109 let url = Url::parse(&format!("file://{}", local_path))
110 .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
111 deltalake::open_table(url).await?
112 }
113 DeltaTableUrl::Gcs(gcs_path) => {
114 let mut storage_options = HashMap::new();
115 storage_options.insert(
116 GCS_SERVICE_ACCOUNT.to_owned(),
117 self.gcs_service_account.clone().ok_or_else(|| {
118 SinkError::Config(anyhow!(
119 "gcs.service.account is required with Google Cloud Storage (GCS)"
120 ))
121 })?,
122 );
123 deltalake::gcp::register_handlers(None);
124 let url = Url::parse(&gcs_path)
125 .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
126 deltalake::open_table_with_storage_options(url, storage_options)
127 .await?
128 }
129 };
130 Ok(table)
131 }
132
133 fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
134 if path.starts_with("s3://") || path.starts_with("s3a://") {
135 Ok(DeltaTableUrl::S3(path.to_owned()))
136 } else if path.starts_with("gs://") {
137 Ok(DeltaTableUrl::Gcs(path.to_owned()))
138 } else if let Some(path) = path.strip_prefix("file://") {
139 Ok(DeltaTableUrl::Local(path.to_owned()))
140 } else {
141 Err(SinkError::DeltaLake(anyhow!(
142 "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
143 )))
144 }
145 }
146
147 async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
148 let mut storage_options = HashMap::new();
149 storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
150 storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
151 let sdk_config = self.aws_auth_props.build_config().await?;
152 let credentials = sdk_config
153 .credentials_provider()
154 .ok_or_else(|| {
155 SinkError::Config(anyhow!(
156 "s3.access.key and s3.secret.key is required with aws s3"
157 ))
158 })?
159 .as_ref()
160 .provide_credentials()
161 .await
162 .map_err(|e| SinkError::Config(e.into()))?;
163 let region = sdk_config.region();
164 let endpoint = sdk_config.endpoint_url();
165 storage_options.insert(
166 AWS_ACCESS_KEY_ID.to_owned(),
167 credentials.access_key_id().to_owned(),
168 );
169 storage_options.insert(
170 AWS_SECRET_ACCESS_KEY.to_owned(),
171 credentials.secret_access_key().to_owned(),
172 );
173 if endpoint.is_none() && region.is_none() {
174 return Err(SinkError::Config(anyhow!(
175 "s3.endpoint and s3.region need to be filled with at least one"
176 )));
177 }
178 storage_options.insert(
179 AWS_REGION.to_owned(),
180 region
181 .map(|r| r.as_ref().to_owned())
182 .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
183 );
184 if let Some(s3_endpoint) = endpoint {
185 storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
186 }
187 Ok(storage_options)
188 }
189}
190
191enum DeltaTableUrl {
192 S3(String),
193 Local(String),
194 Gcs(String),
195}
196
197#[serde_as]
198#[derive(Clone, Debug, Deserialize, WithOptions)]
199pub struct DeltaLakeConfig {
200 #[serde(flatten)]
201 pub common: DeltaLakeCommon,
202
203 pub r#type: String,
204}
205
206impl EnforceSecret for DeltaLakeConfig {
207 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
208 DeltaLakeCommon::enforce_one(prop)
209 }
210}
211
212impl DeltaLakeConfig {
213 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
214 let config = serde_json::from_value::<DeltaLakeConfig>(
215 serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
216 )
217 .map_err(|e| SinkError::Config(anyhow!(e)))?;
218 Ok(config)
219 }
220}
221
222#[derive(Debug)]
223pub struct DeltaLakeSink {
224 pub config: DeltaLakeConfig,
225 param: SinkParam,
226}
227
228impl EnforceSecret for DeltaLakeSink {
229 fn enforce_secret<'a>(
230 prop_iter: impl Iterator<Item = &'a str>,
231 ) -> crate::error::ConnectorResult<()> {
232 for prop in prop_iter {
233 DeltaLakeCommon::enforce_one(prop)?;
234 }
235 Ok(())
236 }
237}
238
239impl DeltaLakeSink {
240 pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
241 Ok(Self { config, param })
242 }
243}
244
245fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
246 let result = match rw_data_type {
247 DataType::Boolean => {
248 matches!(
249 dl_data_type,
250 DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
251 )
252 }
253 DataType::Int16 => {
254 matches!(
255 dl_data_type,
256 DeltaLakeDataType::Primitive(PrimitiveType::Short)
257 )
258 }
259 DataType::Int32 => {
260 matches!(
261 dl_data_type,
262 DeltaLakeDataType::Primitive(PrimitiveType::Integer)
263 )
264 }
265 DataType::Int64 => {
266 matches!(
267 dl_data_type,
268 DeltaLakeDataType::Primitive(PrimitiveType::Long)
269 )
270 }
271 DataType::Float32 => {
272 matches!(
273 dl_data_type,
274 DeltaLakeDataType::Primitive(PrimitiveType::Float)
275 )
276 }
277 DataType::Float64 => {
278 matches!(
279 dl_data_type,
280 DeltaLakeDataType::Primitive(PrimitiveType::Double)
281 )
282 }
283 DataType::Decimal => {
284 matches!(
285 dl_data_type,
286 DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
287 )
288 }
289 DataType::Date => {
290 matches!(
291 dl_data_type,
292 DeltaLakeDataType::Primitive(PrimitiveType::Date)
293 )
294 }
295 DataType::Varchar => {
296 matches!(
297 dl_data_type,
298 DeltaLakeDataType::Primitive(PrimitiveType::String)
299 )
300 }
301 DataType::Timestamptz => {
302 matches!(
303 dl_data_type,
304 DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
305 )
306 }
307 DataType::Struct(rw_struct) => {
308 if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
309 let mut result = true;
310 for ((rw_name, rw_type), dl_field) in
311 rw_struct.iter().zip_eq_debug(dl_struct.fields())
312 {
313 result = check_field_type(rw_type, dl_field.data_type())?
314 && result
315 && rw_name.eq(dl_field.name());
316 }
317 result
318 } else {
319 false
320 }
321 }
322 DataType::List(rw_list) => {
323 if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
324 check_field_type(rw_list.elem(), dl_list.element_type())?
325 } else {
326 false
327 }
328 }
329 _ => {
330 return Err(SinkError::DeltaLake(anyhow!(
331 "Type {:?} is not supported for DeltaLake sink.",
332 rw_data_type.to_owned()
333 )));
334 }
335 };
336 Ok(result)
337}
338
339impl Sink for DeltaLakeSink {
340 type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
341
342 const SINK_NAME: &'static str = DELTALAKE_SINK;
343
344 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
345 let inner = DeltaLakeSinkWriter::new(
346 self.config.clone(),
347 self.param.schema().clone(),
348 self.param.downstream_pk_or_empty(),
349 )
350 .await?;
351
352 let commit_checkpoint_interval =
353 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
354 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
355 );
356
357 let writer = CoordinatedLogSinker::new(
358 &writer_param,
359 self.param.clone(),
360 inner,
361 commit_checkpoint_interval,
362 )
363 .await?;
364
365 Ok(writer)
366 }
367
368 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
369 DeltaLakeConfig::from_btreemap(config.clone())?;
370 Ok(())
371 }
372
373 async fn validate(&self) -> Result<()> {
374 if self.config.r#type != SINK_TYPE_APPEND_ONLY
375 && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
376 {
377 return Err(SinkError::Config(anyhow!(
378 "only append-only delta lake sink is supported",
379 )));
380 }
381 let table = self.config.common.create_deltalake_client().await?;
382 let snapshot = table.snapshot()?;
383 let delta_schema = snapshot.schema();
384 let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = delta_schema
385 .fields()
386 .map(|f| (f.name(), f.data_type()))
387 .collect();
388 if deltalake_fields.len() != self.param.schema().fields().len() {
389 return Err(SinkError::DeltaLake(anyhow!(
390 "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
391 self.param.schema().fields().len(),
392 deltalake_fields.len()
393 )));
394 }
395 for field in self.param.schema().fields() {
396 if !deltalake_fields.contains_key(&field.name) {
397 return Err(SinkError::DeltaLake(anyhow!(
398 "column {} not found in deltalake table",
399 field.name
400 )));
401 }
402 let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
403 SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
404 })?;
405 if !check_field_type(&field.data_type, deltalake_field_type)? {
406 return Err(SinkError::DeltaLake(anyhow!(
407 "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
408 field.name,
409 deltalake_field_type,
410 field.data_type
411 )));
412 }
413 }
414 if self.config.common.commit_checkpoint_interval == 0 {
415 return Err(SinkError::Config(anyhow!(
416 "`commit_checkpoint_interval` must be greater than 0"
417 )));
418 }
419 Ok(())
420 }
421
422 fn is_coordinated_sink(&self) -> bool {
423 true
424 }
425
426 async fn new_coordinator(
427 &self,
428 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
429 ) -> Result<SinkCommitCoordinator> {
430 let coordinator = DeltaLakeSinkCommitter {
431 table: self.config.common.create_deltalake_client().await?,
432 };
433 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
434 }
435}
436
437impl TryFrom<SinkParam> for DeltaLakeSink {
438 type Error = SinkError;
439
440 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
441 let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
442 DeltaLakeSink::new(config, param)
443 }
444}
445
446pub struct DeltaLakeSinkWriter {
447 pub config: DeltaLakeConfig,
448 #[expect(dead_code)]
449 schema: Schema,
450 #[expect(dead_code)]
451 pk_indices: Vec<usize>,
452 writer: RecordBatchWriter,
453 dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
454 #[expect(dead_code)]
455 dl_table: DeltaTable,
456}
457
458impl DeltaLakeSinkWriter {
459 pub async fn new(
460 config: DeltaLakeConfig,
461 schema: Schema,
462 pk_indices: Vec<usize>,
463 ) -> Result<Self> {
464 let dl_table = config.common.create_deltalake_client().await?;
465 let writer = RecordBatchWriter::for_table(&dl_table)?;
466 let dl_schema: Arc<deltalake::arrow::datatypes::Schema> = dl_table.schema();
468
469 Ok(Self {
470 config,
471 schema,
472 pk_indices,
473 writer,
474 dl_schema,
475 dl_table,
476 })
477 }
478
479 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
480 let a = DeltaLakeConvert
481 .to_record_batch(self.dl_schema.clone(), &chunk)
482 .context("convert record batch error")
483 .map_err(SinkError::DeltaLake)?;
484 self.writer.write(a).await?;
485 Ok(())
486 }
487}
488
489
490#[async_trait]
491impl SinkWriter for DeltaLakeSinkWriter {
492 type CommitMetadata = Option<SinkMetadata>;
493
494 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
495 self.write(chunk).await
496 }
497
498 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
499 Ok(())
500 }
501
502 async fn abort(&mut self) -> Result<()> {
503 Ok(())
504 }
505
506 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
507 if !is_checkpoint {
508 return Ok(None);
509 }
510
511 let adds = self.writer.flush().await?;
512 Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
513 adds,
514 })?))
515 }
516}
517
518pub struct DeltaLakeSinkCommitter {
519 table: DeltaTable,
520}
521
522#[async_trait::async_trait]
523impl SinglePhaseCommitCoordinator for DeltaLakeSinkCommitter {
524 async fn init(&mut self) -> Result<()> {
525 tracing::info!("DeltaLake commit coordinator inited.");
526 Ok(())
527 }
528
529 async fn commit_data(
530 &mut self,
531 epoch: u64,
532 metadata: Vec<SinkMetadata>,
533 ) -> Result<()> {
534 tracing::debug!("Starting DeltaLake commit in epoch {epoch}.");
535
536 let deltalake_write_result = metadata
537 .iter()
538 .map(DeltaLakeWriteResult::try_from)
539 .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
540 let write_adds: Vec<Action> = deltalake_write_result
541 .into_iter()
542 .flat_map(|v| v.adds.into_iter())
543 .map(Action::Add)
544 .collect();
545
546 if write_adds.is_empty() {
547 return Ok(());
548 }
549 let partition_cols = self.table.snapshot()?.metadata().partition_columns().clone();
550 let partition_by = if !partition_cols.is_empty() {
551 Some(partition_cols)
552 } else {
553 None
554 };
555 let operation = DeltaOperation::Write {
556 mode: SaveMode::Append,
557 partition_by,
558 predicate: None,
559 };
560 let version = CommitBuilder::default()
561 .with_actions(write_adds)
562 .build(
563 Some(self.table.snapshot()?),
564 self.table.log_store().clone(),
565 operation,
566 )
567 .await?
568 .version();
569 self.table.update().await?;
570 tracing::debug!(
571 "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
572 );
573 Ok(())
574 }
575}
576
577#[derive(Serialize, Deserialize)]
578struct DeltaLakeWriteResult {
579 adds: Vec<Add>,
580}
581
582impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
583 type Error = SinkError;
584
585 fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
586 let metadata =
587 serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
588 Ok(SinkMetadata {
589 metadata: Some(Serialized(SerializedMetadata { metadata })),
590 })
591 }
592}
593
594impl DeltaLakeWriteResult {
595 fn try_from(value: &SinkMetadata) -> Result<Self> {
596 if let Some(Serialized(v)) = &value.metadata {
597 let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
598 .context("Can't deserialize deltalake sink metadata")?;
599 Ok(DeltaLakeWriteResult { adds })
600 } else {
601 bail!("Can't create deltalake sink write result from empty data!")
602 }
603 }
604}
605
606impl From<::deltalake::DeltaTableError> for SinkError {
607 fn from(value: ::deltalake::DeltaTableError) -> Self {
608 SinkError::DeltaLake(anyhow!(value))
609 }
610}
611
612#[cfg(all(test, not(madsim)))]
613mod tests {
614 use deltalake::kernel::DataType as SchemaDataType;
615 use deltalake::operations::create::CreateBuilder;
616 use maplit::btreemap;
617 use url::Url;
618 use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
619 use risingwave_common::catalog::{Field, Schema};
620 use risingwave_common::types::DataType;
621
622 use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
623 use crate::sink::SinglePhaseCommitCoordinator;
624 use crate::sink::writer::SinkWriter;
625
626 #[tokio::test]
627 async fn test_deltalake() {
628 let dir = tempfile::tempdir().unwrap();
629 let path = dir.path().to_str().unwrap();
630 CreateBuilder::new()
631 .with_location(path)
632 .with_column(
633 "id",
634 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
635 false,
636 Default::default(),
637 )
638 .with_column(
639 "name",
640 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
641 false,
642 Default::default(),
643 )
644 .await
645 .unwrap();
646
647 let properties = btreemap! {
648 "connector".to_owned() => "deltalake".to_owned(),
649 "force_append_only".to_owned() => "true".to_owned(),
650 "type".to_owned() => "append-only".to_owned(),
651 "location".to_owned() => format!("file://{}", path),
652 };
653
654 let schema = Schema::new(vec![
655 Field {
656 data_type: DataType::Int32,
657 name: "id".into(),
658 },
659 Field {
660 data_type: DataType::Varchar,
661 name: "name".into(),
662 },
663 ]);
664
665 let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
666 let deltalake_table = deltalake_config
667 .common
668 .create_deltalake_client()
669 .await
670 .unwrap();
671
672 let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
673 .await
674 .unwrap();
675 let chunk = StreamChunk::new(
676 vec![Op::Insert, Op::Insert, Op::Insert],
677 vec![
678 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
679 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
680 ],
681 );
682 deltalake_writer.write(chunk).await.unwrap();
683 let mut committer = DeltaLakeSinkCommitter {
684 table: deltalake_table,
685 };
686 let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
687 committer.commit_data(1, vec![metadata]).await.unwrap();
688
689 let ctx = deltalake::datafusion::prelude::SessionContext::new();
698 let table = deltalake::open_table(Url::parse(&format!("file://{}", path)).unwrap()).await.unwrap();
699 ctx.register_table("demo", std::sync::Arc::new(table))
700 .unwrap();
701
702 let batches = ctx
703 .sql("SELECT * FROM demo")
704 .await
705 .unwrap()
706 .collect()
707 .await
708 .unwrap();
709 assert_eq!(3, batches.get(0).unwrap().column(0).len());
710 assert_eq!(3, batches.get(0).unwrap().column(1).len());
711 }
712}