1use core::num::NonZeroU64;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18
19use anyhow::{Context, anyhow};
20use async_trait::async_trait;
21use deltalake::DeltaTable;
22use deltalake::aws::storage::s3_constants::{
23 AWS_ACCESS_KEY_ID, AWS_ALLOW_HTTP, AWS_ENDPOINT_URL, AWS_REGION, AWS_S3_ALLOW_UNSAFE_RENAME,
24 AWS_SECRET_ACCESS_KEY,
25};
26use deltalake::kernel::transaction::CommitBuilder;
27use deltalake::kernel::{Action, Add, DataType as DeltaLakeDataType, PrimitiveType};
28use deltalake::protocol::{DeltaOperation, SaveMode};
29use deltalake::writer::{DeltaWriter, RecordBatchWriter};
30use phf::{Set, phf_set};
31use risingwave_common::array::StreamChunk;
32use risingwave_common::array::arrow::DeltaLakeConvert;
33use risingwave_common::bail;
34use risingwave_common::catalog::Schema;
35use risingwave_common::types::DataType;
36use risingwave_common::util::iter_util::ZipEqDebug;
37use risingwave_pb::connector_service::SinkMetadata;
38use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
39use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
40use serde::{Deserialize, Serialize};
41use serde_with::{DisplayFromStr, serde_as};
42use tokio::sync::mpsc::UnboundedSender;
43use url::Url;
44use with_options::WithOptions;
45
46use crate::connector_common::{AwsAuthProps, IcebergSinkCompactionUpdate};
47use crate::enforce_secret::{EnforceSecret, EnforceSecretError};
48use crate::sink::coordinate::CoordinatedLogSinker;
49use crate::sink::decouple_checkpoint_log_sink::default_commit_checkpoint_interval;
50use crate::sink::writer::SinkWriter;
51use crate::sink::{
52 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
53 SinglePhaseCommitCoordinator, Sink, SinkCommitCoordinator, SinkError, SinkParam,
54 SinkWriterParam,
55};
56
57pub const DEFAULT_REGION: &str = "us-east-1";
58pub const GCS_SERVICE_ACCOUNT: &str = "service_account_key";
59
60pub const DELTALAKE_SINK: &str = "deltalake";
61
62#[serde_as]
63#[derive(Deserialize, Debug, Clone, WithOptions)]
64pub struct DeltaLakeCommon {
65 #[serde(rename = "location")]
66 pub location: String,
67 #[serde(flatten)]
68 pub aws_auth_props: AwsAuthProps,
69
70 #[serde(rename = "gcs.service.account")]
71 pub gcs_service_account: Option<String>,
72 #[serde(default = "default_commit_checkpoint_interval")]
74 #[serde_as(as = "DisplayFromStr")]
75 #[with_option(allow_alter_on_fly)]
76 pub commit_checkpoint_interval: u64,
77}
78
79impl EnforceSecret for DeltaLakeCommon {
80 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
81 "gcs.service.account",
82 };
83
84 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
85 AwsAuthProps::enforce_one(prop)?;
86 if Self::ENFORCE_SECRET_PROPERTIES.contains(prop) {
87 return Err(EnforceSecretError {
88 key: prop.to_owned(),
89 }
90 .into());
91 }
92
93 Ok(())
94 }
95}
96
97impl DeltaLakeCommon {
98 pub async fn create_deltalake_client(&self) -> Result<DeltaTable> {
99 let table = match Self::get_table_url(&self.location)? {
100 DeltaTableUrl::S3(s3_path) => {
101 let storage_options = self.build_delta_lake_config_for_aws().await?;
102 deltalake::aws::register_handlers(None);
103 let url = Url::parse(&s3_path).map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
104 deltalake::open_table_with_storage_options(url, storage_options).await?
105 }
106 DeltaTableUrl::Local(local_path) => {
107 let url = Url::parse(&format!("file://{}", local_path))
108 .map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
109 deltalake::open_table(url).await?
110 }
111 DeltaTableUrl::Gcs(gcs_path) => {
112 let mut storage_options = HashMap::new();
113 storage_options.insert(
114 GCS_SERVICE_ACCOUNT.to_owned(),
115 self.gcs_service_account.clone().ok_or_else(|| {
116 SinkError::Config(anyhow!(
117 "gcs.service.account is required with Google Cloud Storage (GCS)"
118 ))
119 })?,
120 );
121 deltalake::gcp::register_handlers(None);
122 let url = Url::parse(&gcs_path).map_err(|e| SinkError::DeltaLake(anyhow!(e)))?;
123 deltalake::open_table_with_storage_options(url, storage_options).await?
124 }
125 };
126 Ok(table)
127 }
128
129 fn get_table_url(path: &str) -> Result<DeltaTableUrl> {
130 if path.starts_with("s3://") || path.starts_with("s3a://") {
131 Ok(DeltaTableUrl::S3(path.to_owned()))
132 } else if path.starts_with("gs://") {
133 Ok(DeltaTableUrl::Gcs(path.to_owned()))
134 } else if let Some(path) = path.strip_prefix("file://") {
135 Ok(DeltaTableUrl::Local(path.to_owned()))
136 } else {
137 Err(SinkError::DeltaLake(anyhow!(
138 "path should start with 's3://','s3a://'(s3) ,gs://(gcs) or file://(local)"
139 )))
140 }
141 }
142
143 async fn build_delta_lake_config_for_aws(&self) -> Result<HashMap<String, String>> {
144 let mut storage_options = HashMap::new();
145 storage_options.insert(AWS_ALLOW_HTTP.to_owned(), "true".to_owned());
146 storage_options.insert(AWS_S3_ALLOW_UNSAFE_RENAME.to_owned(), "true".to_owned());
147 let sdk_config = self.aws_auth_props.build_config().await?;
148 let credentials = sdk_config
149 .credentials_provider()
150 .ok_or_else(|| {
151 SinkError::Config(anyhow!(
152 "s3.access.key and s3.secret.key is required with aws s3"
153 ))
154 })?
155 .as_ref()
156 .provide_credentials()
157 .await
158 .map_err(|e| SinkError::Config(e.into()))?;
159 let region = sdk_config.region();
160 let endpoint = sdk_config.endpoint_url();
161 storage_options.insert(
162 AWS_ACCESS_KEY_ID.to_owned(),
163 credentials.access_key_id().to_owned(),
164 );
165 storage_options.insert(
166 AWS_SECRET_ACCESS_KEY.to_owned(),
167 credentials.secret_access_key().to_owned(),
168 );
169 if endpoint.is_none() && region.is_none() {
170 return Err(SinkError::Config(anyhow!(
171 "s3.endpoint and s3.region need to be filled with at least one"
172 )));
173 }
174 storage_options.insert(
175 AWS_REGION.to_owned(),
176 region
177 .map(|r| r.as_ref().to_owned())
178 .unwrap_or_else(|| DEFAULT_REGION.to_owned()),
179 );
180 if let Some(s3_endpoint) = endpoint {
181 storage_options.insert(AWS_ENDPOINT_URL.to_owned(), s3_endpoint.to_owned());
182 }
183 Ok(storage_options)
184 }
185}
186
187enum DeltaTableUrl {
188 S3(String),
189 Local(String),
190 Gcs(String),
191}
192
193#[serde_as]
194#[derive(Clone, Debug, Deserialize, WithOptions)]
195pub struct DeltaLakeConfig {
196 #[serde(flatten)]
197 pub common: DeltaLakeCommon,
198
199 pub r#type: String,
200}
201
202impl EnforceSecret for DeltaLakeConfig {
203 fn enforce_one(prop: &str) -> crate::error::ConnectorResult<()> {
204 DeltaLakeCommon::enforce_one(prop)
205 }
206}
207
208impl DeltaLakeConfig {
209 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
210 let config = serde_json::from_value::<DeltaLakeConfig>(
211 serde_json::to_value(properties).map_err(|e| SinkError::DeltaLake(e.into()))?,
212 )
213 .map_err(|e| SinkError::Config(anyhow!(e)))?;
214 Ok(config)
215 }
216}
217
218#[derive(Debug)]
219pub struct DeltaLakeSink {
220 pub config: DeltaLakeConfig,
221 param: SinkParam,
222}
223
224impl EnforceSecret for DeltaLakeSink {
225 fn enforce_secret<'a>(
226 prop_iter: impl Iterator<Item = &'a str>,
227 ) -> crate::error::ConnectorResult<()> {
228 for prop in prop_iter {
229 DeltaLakeCommon::enforce_one(prop)?;
230 }
231 Ok(())
232 }
233}
234
235impl DeltaLakeSink {
236 pub fn new(config: DeltaLakeConfig, param: SinkParam) -> Result<Self> {
237 Ok(Self { config, param })
238 }
239}
240
241fn check_field_type(rw_data_type: &DataType, dl_data_type: &DeltaLakeDataType) -> Result<bool> {
242 let result = match rw_data_type {
243 DataType::Boolean => {
244 matches!(
245 dl_data_type,
246 DeltaLakeDataType::Primitive(PrimitiveType::Boolean)
247 )
248 }
249 DataType::Int16 => {
250 matches!(
251 dl_data_type,
252 DeltaLakeDataType::Primitive(PrimitiveType::Short)
253 )
254 }
255 DataType::Int32 => {
256 matches!(
257 dl_data_type,
258 DeltaLakeDataType::Primitive(PrimitiveType::Integer)
259 )
260 }
261 DataType::Int64 => {
262 matches!(
263 dl_data_type,
264 DeltaLakeDataType::Primitive(PrimitiveType::Long)
265 )
266 }
267 DataType::Float32 => {
268 matches!(
269 dl_data_type,
270 DeltaLakeDataType::Primitive(PrimitiveType::Float)
271 )
272 }
273 DataType::Float64 => {
274 matches!(
275 dl_data_type,
276 DeltaLakeDataType::Primitive(PrimitiveType::Double)
277 )
278 }
279 DataType::Decimal => {
280 matches!(
281 dl_data_type,
282 DeltaLakeDataType::Primitive(PrimitiveType::Decimal(_))
283 )
284 }
285 DataType::Date => {
286 matches!(
287 dl_data_type,
288 DeltaLakeDataType::Primitive(PrimitiveType::Date)
289 )
290 }
291 DataType::Varchar => {
292 matches!(
293 dl_data_type,
294 DeltaLakeDataType::Primitive(PrimitiveType::String)
295 )
296 }
297 DataType::Timestamptz => {
298 matches!(
299 dl_data_type,
300 DeltaLakeDataType::Primitive(PrimitiveType::Timestamp)
301 )
302 }
303 DataType::Struct(rw_struct) => {
304 if let DeltaLakeDataType::Struct(dl_struct) = dl_data_type {
305 let mut result = true;
306 for ((rw_name, rw_type), dl_field) in
307 rw_struct.iter().zip_eq_debug(dl_struct.fields())
308 {
309 result = check_field_type(rw_type, dl_field.data_type())?
310 && result
311 && rw_name.eq(dl_field.name());
312 }
313 result
314 } else {
315 false
316 }
317 }
318 DataType::List(rw_list) => {
319 if let DeltaLakeDataType::Array(dl_list) = dl_data_type {
320 check_field_type(rw_list.elem(), dl_list.element_type())?
321 } else {
322 false
323 }
324 }
325 _ => {
326 return Err(SinkError::DeltaLake(anyhow!(
327 "Type {:?} is not supported for DeltaLake sink.",
328 rw_data_type.to_owned()
329 )));
330 }
331 };
332 Ok(result)
333}
334
335impl Sink for DeltaLakeSink {
336 type LogSinker = CoordinatedLogSinker<DeltaLakeSinkWriter>;
337
338 const SINK_NAME: &'static str = DELTALAKE_SINK;
339
340 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
341 let inner = DeltaLakeSinkWriter::new(
342 self.config.clone(),
343 self.param.schema().clone(),
344 self.param.downstream_pk_or_empty(),
345 )
346 .await?;
347
348 let commit_checkpoint_interval =
349 NonZeroU64::new(self.config.common.commit_checkpoint_interval).expect(
350 "commit_checkpoint_interval should be greater than 0, and it should be checked in config validation",
351 );
352
353 let writer = CoordinatedLogSinker::new(
354 &writer_param,
355 self.param.clone(),
356 inner,
357 commit_checkpoint_interval,
358 )
359 .await?;
360
361 Ok(writer)
362 }
363
364 fn validate_alter_config(config: &BTreeMap<String, String>) -> Result<()> {
365 DeltaLakeConfig::from_btreemap(config.clone())?;
366 Ok(())
367 }
368
369 async fn validate(&self) -> Result<()> {
370 if self.config.r#type != SINK_TYPE_APPEND_ONLY
371 && self.config.r#type != SINK_USER_FORCE_APPEND_ONLY_OPTION
372 {
373 return Err(SinkError::Config(anyhow!(
374 "only append-only delta lake sink is supported",
375 )));
376 }
377 let table = self.config.common.create_deltalake_client().await?;
378 let snapshot = table.snapshot()?;
379 let delta_schema = snapshot.schema();
380 let deltalake_fields: HashMap<&String, &DeltaLakeDataType> = delta_schema
381 .fields()
382 .map(|f| (f.name(), f.data_type()))
383 .collect();
384 if deltalake_fields.len() != self.param.schema().fields().len() {
385 return Err(SinkError::DeltaLake(anyhow!(
386 "Columns mismatch. RisingWave schema has {} fields, DeltaLake schema has {} fields",
387 self.param.schema().fields().len(),
388 deltalake_fields.len()
389 )));
390 }
391 for field in self.param.schema().fields() {
392 if !deltalake_fields.contains_key(&field.name) {
393 return Err(SinkError::DeltaLake(anyhow!(
394 "column {} not found in deltalake table",
395 field.name
396 )));
397 }
398 let deltalake_field_type = deltalake_fields.get(&field.name).ok_or_else(|| {
399 SinkError::DeltaLake(anyhow!("cannot find field type for {}", field.name))
400 })?;
401 if !check_field_type(&field.data_type, deltalake_field_type)? {
402 return Err(SinkError::DeltaLake(anyhow!(
403 "column '{}' type mismatch: deltalake type is {:?}, RisingWave type is {:?}",
404 field.name,
405 deltalake_field_type,
406 field.data_type
407 )));
408 }
409 }
410 if self.config.common.commit_checkpoint_interval == 0 {
411 return Err(SinkError::Config(anyhow!(
412 "`commit_checkpoint_interval` must be greater than 0"
413 )));
414 }
415 Ok(())
416 }
417
418 fn is_coordinated_sink(&self) -> bool {
419 true
420 }
421
422 async fn new_coordinator(
423 &self,
424 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
425 ) -> Result<SinkCommitCoordinator> {
426 let coordinator = DeltaLakeSinkCommitter {
427 table: self.config.common.create_deltalake_client().await?,
428 };
429 Ok(SinkCommitCoordinator::SinglePhase(Box::new(coordinator)))
430 }
431}
432
433impl TryFrom<SinkParam> for DeltaLakeSink {
434 type Error = SinkError;
435
436 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
437 let config = DeltaLakeConfig::from_btreemap(param.properties.clone())?;
438 DeltaLakeSink::new(config, param)
439 }
440}
441
442pub struct DeltaLakeSinkWriter {
443 pub config: DeltaLakeConfig,
444 #[expect(dead_code)]
445 schema: Schema,
446 #[expect(dead_code)]
447 pk_indices: Vec<usize>,
448 writer: RecordBatchWriter,
449 dl_schema: Arc<deltalake::arrow::datatypes::Schema>,
450 #[expect(dead_code)]
451 dl_table: DeltaTable,
452}
453
454impl DeltaLakeSinkWriter {
455 pub async fn new(
456 config: DeltaLakeConfig,
457 schema: Schema,
458 pk_indices: Vec<usize>,
459 ) -> Result<Self> {
460 let dl_table = config.common.create_deltalake_client().await?;
461 let writer = RecordBatchWriter::for_table(&dl_table)?;
462 let dl_schema = writer.arrow_schema();
463
464 Ok(Self {
465 config,
466 schema,
467 pk_indices,
468 writer,
469 dl_schema,
470 dl_table,
471 })
472 }
473
474 async fn write(&mut self, chunk: StreamChunk) -> Result<()> {
475 let a = DeltaLakeConvert
476 .to_record_batch(self.dl_schema.clone(), &chunk)
477 .context("convert record batch error")
478 .map_err(SinkError::DeltaLake)?;
479 self.writer.write(a).await?;
480 Ok(())
481 }
482}
483
484#[async_trait]
485impl SinkWriter for DeltaLakeSinkWriter {
486 type CommitMetadata = Option<SinkMetadata>;
487
488 async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
489 self.write(chunk).await
490 }
491
492 async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
493 Ok(())
494 }
495
496 async fn abort(&mut self) -> Result<()> {
497 Ok(())
498 }
499
500 async fn barrier(&mut self, is_checkpoint: bool) -> Result<Option<SinkMetadata>> {
501 if !is_checkpoint {
502 return Ok(None);
503 }
504
505 let adds = self.writer.flush().await?;
506 Ok(Some(SinkMetadata::try_from(&DeltaLakeWriteResult {
507 adds,
508 })?))
509 }
510}
511
512pub struct DeltaLakeSinkCommitter {
513 table: DeltaTable,
514}
515
516#[async_trait::async_trait]
517impl SinglePhaseCommitCoordinator for DeltaLakeSinkCommitter {
518 async fn init(&mut self) -> Result<()> {
519 tracing::info!("DeltaLake commit coordinator inited.");
520 Ok(())
521 }
522
523 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()> {
524 tracing::debug!("Starting DeltaLake commit in epoch {epoch}.");
525
526 let deltalake_write_result = metadata
527 .iter()
528 .map(DeltaLakeWriteResult::try_from)
529 .collect::<Result<Vec<DeltaLakeWriteResult>>>()?;
530 let write_adds: Vec<Action> = deltalake_write_result
531 .into_iter()
532 .flat_map(|v| v.adds.into_iter())
533 .map(Action::Add)
534 .collect();
535
536 if write_adds.is_empty() {
537 return Ok(());
538 }
539 let partition_cols = self
540 .table
541 .snapshot()?
542 .metadata()
543 .partition_columns()
544 .clone();
545 let partition_by = if !partition_cols.is_empty() {
546 Some(partition_cols)
547 } else {
548 None
549 };
550 let operation = DeltaOperation::Write {
551 mode: SaveMode::Append,
552 partition_by,
553 predicate: None,
554 };
555 let version = CommitBuilder::default()
556 .with_actions(write_adds)
557 .build(
558 Some(self.table.snapshot()?),
559 self.table.log_store().clone(),
560 operation,
561 )
562 .await?
563 .version();
564 self.table.update_state().await?;
565 tracing::debug!(
566 "Succeeded to commit to DeltaLake table in epoch {epoch}, version {version}."
567 );
568 Ok(())
569 }
570}
571
572#[derive(Serialize, Deserialize)]
573struct DeltaLakeWriteResult {
574 adds: Vec<Add>,
575}
576
577impl<'a> TryFrom<&'a DeltaLakeWriteResult> for SinkMetadata {
578 type Error = SinkError;
579
580 fn try_from(value: &'a DeltaLakeWriteResult) -> std::result::Result<Self, Self::Error> {
581 let metadata =
582 serde_json::to_vec(&value.adds).context("cannot serialize deltalake sink metadata")?;
583 Ok(SinkMetadata {
584 metadata: Some(Serialized(SerializedMetadata { metadata })),
585 })
586 }
587}
588
589impl DeltaLakeWriteResult {
590 fn try_from(value: &SinkMetadata) -> Result<Self> {
591 if let Some(Serialized(v)) = &value.metadata {
592 let adds = serde_json::from_slice::<Vec<Add>>(&v.metadata)
593 .context("Can't deserialize deltalake sink metadata")?;
594 Ok(DeltaLakeWriteResult { adds })
595 } else {
596 bail!("Can't create deltalake sink write result from empty data!")
597 }
598 }
599}
600
601impl From<::deltalake::DeltaTableError> for SinkError {
602 fn from(value: ::deltalake::DeltaTableError) -> Self {
603 SinkError::DeltaLake(anyhow!(value))
604 }
605}
606
607#[cfg(all(test, not(madsim)))]
608mod tests {
609 use deltalake::kernel::DataType as SchemaDataType;
610 use deltalake::operations::create::CreateBuilder;
611 use maplit::btreemap;
612 use risingwave_common::array::{Array, I32Array, Op, StreamChunk, Utf8Array};
613 use risingwave_common::catalog::{Field, Schema};
614 use risingwave_common::types::DataType;
615
616 use super::{DeltaLakeConfig, DeltaLakeSinkCommitter, DeltaLakeSinkWriter};
617 use crate::sink::SinglePhaseCommitCoordinator;
618 use crate::sink::writer::SinkWriter;
619
620 #[tokio::test]
621 async fn test_deltalake() {
622 let dir = tempfile::tempdir().unwrap();
623 let path = dir.path().to_str().unwrap();
624 CreateBuilder::new()
625 .with_location(path)
626 .with_column(
627 "id",
628 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
629 false,
630 Default::default(),
631 )
632 .with_column(
633 "name",
634 SchemaDataType::Primitive(deltalake::kernel::PrimitiveType::String),
635 false,
636 Default::default(),
637 )
638 .await
639 .unwrap();
640
641 let properties = btreemap! {
642 "connector".to_owned() => "deltalake".to_owned(),
643 "force_append_only".to_owned() => "true".to_owned(),
644 "type".to_owned() => "append-only".to_owned(),
645 "location".to_owned() => format!("file://{}", path),
646 };
647
648 let schema = Schema::new(vec![
649 Field {
650 data_type: DataType::Int32,
651 name: "id".into(),
652 },
653 Field {
654 data_type: DataType::Varchar,
655 name: "name".into(),
656 },
657 ]);
658
659 let deltalake_config = DeltaLakeConfig::from_btreemap(properties).unwrap();
660 let deltalake_table = deltalake_config
661 .common
662 .create_deltalake_client()
663 .await
664 .unwrap();
665
666 let mut deltalake_writer = DeltaLakeSinkWriter::new(deltalake_config, schema, vec![0])
667 .await
668 .unwrap();
669 let chunk = StreamChunk::new(
670 vec![Op::Insert, Op::Insert, Op::Insert],
671 vec![
672 I32Array::from_iter(vec![1, 2, 3]).into_ref(),
673 Utf8Array::from_iter(vec!["Alice", "Bob", "Clare"]).into_ref(),
674 ],
675 );
676 deltalake_writer.write(chunk).await.unwrap();
677 let mut committer = DeltaLakeSinkCommitter {
678 table: deltalake_table,
679 };
680 let metadata = deltalake_writer.barrier(true).await.unwrap().unwrap();
681 committer.commit_data(1, vec![metadata]).await.unwrap();
682 let snapshot = committer.table.snapshot().unwrap();
683 assert_eq!(1, snapshot.log_data().num_files());
684 }
685}