1pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32use risingwave_common::bail;
33pub mod log_store;
34pub mod mock_coordination_client;
35pub mod mongodb;
36pub mod mqtt;
37pub mod nats;
38pub mod postgres;
39pub mod pulsar;
40pub mod redis;
41pub mod remote;
42pub mod sqlserver;
43pub mod starrocks;
44pub mod test_sink;
45pub mod trivial;
46pub mod utils;
47pub mod writer;
48pub mod prelude {
49 pub use crate::sink::{
50 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkError,
51 SinkParam, SinkWriterParam,
52 };
53}
54
55use std::collections::BTreeMap;
56use std::future::Future;
57use std::sync::{Arc, LazyLock};
58
59use ::clickhouse::error::Error as ClickHouseError;
60use ::redis::RedisError;
61use anyhow::anyhow;
62use async_trait::async_trait;
63use clickhouse::CLICKHOUSE_SINK;
64use decouple_checkpoint_log_sink::{
65 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
66 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
67};
68use deltalake::DELTALAKE_SINK;
69use futures::future::BoxFuture;
70use iceberg::ICEBERG_SINK;
71use opendal::Error as OpendalError;
72use prometheus::Registry;
73use risingwave_common::array::ArrayError;
74use risingwave_common::bitmap::Bitmap;
75use risingwave_common::catalog::{ColumnDesc, Field, Schema};
76use risingwave_common::config::StreamingConfig;
77use risingwave_common::hash::ActorId;
78use risingwave_common::metrics::{
79 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
80 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
81};
82use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
83use risingwave_common::secret::{LocalSecretManager, SecretError};
84use risingwave_common::session_config::sink_decouple::SinkDecouple;
85use risingwave_common::{
86 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
87 register_guarded_int_gauge_vec_with_registry,
88};
89use risingwave_pb::catalog::PbSinkType;
90use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
91use risingwave_rpc_client::MetaClient;
92use risingwave_rpc_client::error::RpcError;
93use sea_orm::DatabaseConnection;
94use starrocks::STARROCKS_SINK;
95use thiserror::Error;
96use thiserror_ext::AsReport;
97use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
98pub use tracing;
99
100use self::catalog::{SinkFormatDesc, SinkType};
101use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
102use crate::WithPropertiesExt;
103use crate::connector_common::IcebergSinkCompactionUpdate;
104use crate::error::{ConnectorError, ConnectorResult};
105use crate::sink::catalog::desc::SinkDesc;
106use crate::sink::catalog::{SinkCatalog, SinkId};
107use crate::sink::file_sink::fs::FsSink;
108use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
109use crate::sink::writer::SinkWriter;
110
111const BOUNDED_CHANNEL_SIZE: usize = 16;
112#[macro_export]
113macro_rules! for_all_sinks {
114 ($macro:path $(, $arg:tt)*) => {
115 $macro! {
116 {
117 { Redis, $crate::sink::redis::RedisSink },
118 { Kafka, $crate::sink::kafka::KafkaSink },
119 { Pulsar, $crate::sink::pulsar::PulsarSink },
120 { BlackHole, $crate::sink::trivial::BlackHoleSink },
121 { Kinesis, $crate::sink::kinesis::KinesisSink },
122 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink },
123 { Iceberg, $crate::sink::iceberg::IcebergSink },
124 { Mqtt, $crate::sink::mqtt::MqttSink },
125 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink },
126 { Nats, $crate::sink::nats::NatsSink },
127 { Jdbc, $crate::sink::remote::JdbcSink },
128 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink },
131 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink },
132 { Cassandra, $crate::sink::remote::CassandraSink },
133 { Doris, $crate::sink::doris::DorisSink },
134 { Starrocks, $crate::sink::starrocks::StarrocksSink },
135 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>},
136
137 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink> },
138 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>},
139 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>},
140
141 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink> },
142 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>},
143 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
144 { BigQuery, $crate::sink::big_query::BigQuerySink },
145 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink },
146 { Mongodb, $crate::sink::mongodb::MongodbSink },
147 { SqlServer, $crate::sink::sqlserver::SqlServerSink },
148 { Postgres, $crate::sink::postgres::PostgresSink },
149
150 { Test, $crate::sink::test_sink::TestSink },
151 { Table, $crate::sink::trivial::TableSink }
152 }
153 $(,$arg)*
154 }
155 };
156}
157
158#[macro_export]
159macro_rules! dispatch_sink {
160 ({$({$variant_name:ident, $sink_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
161 use $crate::sink::SinkImpl;
162
163 match $impl {
164 $(
165 SinkImpl::$variant_name($sink) => $body,
166 )*
167 }
168 }};
169 ($impl:expr, $sink:ident, $body:expr) => {{
170 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
171 }};
172}
173
174#[macro_export]
175macro_rules! match_sink_name_str {
176 ({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
177 use $crate::sink::Sink;
178 match $name_str {
179 $(
180 <$sink_type>::SINK_NAME => {
181 type $type_name = $sink_type;
182 {
183 $body
184 }
185 },
186 )*
187 other => ($on_other_closure)(other),
188 }
189 }};
190 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
191 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
192 }};
193}
194
195pub const CONNECTOR_TYPE_KEY: &str = "connector";
196pub const SINK_TYPE_OPTION: &str = "type";
197pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
199pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
200pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
201pub const SINK_TYPE_UPSERT: &str = "upsert";
202pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
203
204#[derive(Debug, Clone, PartialEq, Eq)]
205pub struct SinkParam {
206 pub sink_id: SinkId,
207 pub sink_name: String,
208 pub properties: BTreeMap<String, String>,
209 pub columns: Vec<ColumnDesc>,
210 pub downstream_pk: Vec<usize>,
211 pub sink_type: SinkType,
212 pub format_desc: Option<SinkFormatDesc>,
213 pub db_name: String,
214
215 pub sink_from_name: String,
221}
222
223impl SinkParam {
224 pub fn from_proto(pb_param: PbSinkParam) -> Self {
225 let table_schema = pb_param.table_schema.expect("should contain table schema");
226 let format_desc = match pb_param.format_desc {
227 Some(f) => f.try_into().ok(),
228 None => {
229 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
230 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
231 match (connector, r#type) {
232 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
233 _ => None,
234 }
235 }
236 };
237 Self {
238 sink_id: SinkId::from(pb_param.sink_id),
239 sink_name: pb_param.sink_name,
240 properties: pb_param.properties,
241 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
242 downstream_pk: table_schema
243 .pk_indices
244 .iter()
245 .map(|i| *i as usize)
246 .collect(),
247 sink_type: SinkType::from_proto(
248 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
249 ),
250 format_desc,
251 db_name: pb_param.db_name,
252 sink_from_name: pb_param.sink_from_name,
253 }
254 }
255
256 pub fn to_proto(&self) -> PbSinkParam {
257 PbSinkParam {
258 sink_id: self.sink_id.sink_id,
259 sink_name: self.sink_name.clone(),
260 properties: self.properties.clone(),
261 table_schema: Some(TableSchema {
262 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
263 pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
264 }),
265 sink_type: self.sink_type.to_proto().into(),
266 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
267 db_name: self.db_name.clone(),
268 sink_from_name: self.sink_from_name.clone(),
269 }
270 }
271
272 pub fn schema(&self) -> Schema {
273 Schema {
274 fields: self.columns.iter().map(Field::from).collect(),
275 }
276 }
277
278 pub fn fill_secret_for_format_desc(
281 format_desc: Option<SinkFormatDesc>,
282 ) -> Result<Option<SinkFormatDesc>> {
283 match format_desc {
284 Some(mut format_desc) => {
285 format_desc.options = LocalSecretManager::global()
286 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
287 Ok(Some(format_desc))
288 }
289 None => Ok(None),
290 }
291 }
292
293 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
295 let columns = sink_catalog
296 .visible_columns()
297 .map(|col| col.column_desc.clone())
298 .collect();
299 let properties_with_secret = LocalSecretManager::global()
300 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
301 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
302 Ok(Self {
303 sink_id: sink_catalog.id,
304 sink_name: sink_catalog.name,
305 properties: properties_with_secret,
306 columns,
307 downstream_pk: sink_catalog.downstream_pk,
308 sink_type: sink_catalog.sink_type,
309 format_desc: format_desc_with_secret,
310 db_name: sink_catalog.db_name,
311 sink_from_name: sink_catalog.sink_from_name,
312 })
313 }
314}
315
316pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
317 use crate::enforce_secret::EnforceSecret;
318
319 let connector = props
320 .get_connector()
321 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
322 let key_iter = props.key_iter();
323 match_sink_name_str!(
324 connector.as_str(),
325 PropType,
326 PropType::enforce_secret(key_iter),
327 |other| bail!("connector '{}' is not supported", other)
328 )
329}
330
331pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
332 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
333
334#[derive(Clone)]
335pub struct SinkMetrics {
336 pub sink_commit_duration: LabelGuardedHistogramVec,
337 pub connector_sink_rows_received: LabelGuardedIntCounterVec,
338
339 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
341 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
342 pub log_store_write_rows: LabelGuardedIntCounterVec,
343
344 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
346 pub log_store_read_rows: LabelGuardedIntCounterVec,
347 pub log_store_read_bytes: LabelGuardedIntCounterVec,
348 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
349
350 pub iceberg_write_qps: LabelGuardedIntCounterVec,
352 pub iceberg_write_latency: LabelGuardedHistogramVec,
353 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
354 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
355 pub iceberg_partition_num: LabelGuardedIntGaugeVec,
356 pub iceberg_write_bytes: LabelGuardedIntCounterVec,
357}
358
359impl SinkMetrics {
360 pub fn new(registry: &Registry) -> Self {
361 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
362 "sink_commit_duration",
363 "Duration of commit op in sink",
364 &["actor_id", "connector", "sink_id", "sink_name"],
365 registry
366 )
367 .unwrap();
368
369 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
370 "connector_sink_rows_received",
371 "Number of rows received by sink",
372 &["actor_id", "connector_type", "sink_id", "sink_name"],
373 registry
374 )
375 .unwrap();
376
377 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
378 "log_store_first_write_epoch",
379 "The first write epoch of log store",
380 &["actor_id", "sink_id", "sink_name"],
381 registry
382 )
383 .unwrap();
384
385 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
386 "log_store_latest_write_epoch",
387 "The latest write epoch of log store",
388 &["actor_id", "sink_id", "sink_name"],
389 registry
390 )
391 .unwrap();
392
393 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
394 "log_store_write_rows",
395 "The write rate of rows",
396 &["actor_id", "sink_id", "sink_name"],
397 registry
398 )
399 .unwrap();
400
401 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
402 "log_store_latest_read_epoch",
403 "The latest read epoch of log store",
404 &["actor_id", "connector", "sink_id", "sink_name"],
405 registry
406 )
407 .unwrap();
408
409 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
410 "log_store_read_rows",
411 "The read rate of rows",
412 &["actor_id", "connector", "sink_id", "sink_name"],
413 registry
414 )
415 .unwrap();
416
417 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
418 "log_store_read_bytes",
419 "Total size of chunks read by log reader",
420 &["actor_id", "connector", "sink_id", "sink_name"],
421 registry
422 )
423 .unwrap();
424
425 let log_store_reader_wait_new_future_duration_ns =
426 register_guarded_int_counter_vec_with_registry!(
427 "log_store_reader_wait_new_future_duration_ns",
428 "Accumulated duration of LogReader to wait for next call to create future",
429 &["actor_id", "connector", "sink_id", "sink_name"],
430 registry
431 )
432 .unwrap();
433
434 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
435 "iceberg_write_qps",
436 "The qps of iceberg writer",
437 &["actor_id", "sink_id", "sink_name"],
438 registry
439 )
440 .unwrap();
441
442 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
443 "iceberg_write_latency",
444 "The latency of iceberg writer",
445 &["actor_id", "sink_id", "sink_name"],
446 registry
447 )
448 .unwrap();
449
450 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
451 "iceberg_rolling_unflushed_data_file",
452 "The unflushed data file count of iceberg rolling writer",
453 &["actor_id", "sink_id", "sink_name"],
454 registry
455 )
456 .unwrap();
457
458 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
459 "iceberg_position_delete_cache_num",
460 "The delete cache num of iceberg position delete writer",
461 &["actor_id", "sink_id", "sink_name"],
462 registry
463 )
464 .unwrap();
465
466 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
467 "iceberg_partition_num",
468 "The partition num of iceberg partition writer",
469 &["actor_id", "sink_id", "sink_name"],
470 registry
471 )
472 .unwrap();
473
474 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
475 "iceberg_write_bytes",
476 "The write bytes of iceberg writer",
477 &["actor_id", "sink_id", "sink_name"],
478 registry
479 )
480 .unwrap();
481
482 Self {
483 sink_commit_duration,
484 connector_sink_rows_received,
485 log_store_first_write_epoch,
486 log_store_latest_write_epoch,
487 log_store_write_rows,
488 log_store_latest_read_epoch,
489 log_store_read_rows,
490 log_store_read_bytes,
491 log_store_reader_wait_new_future_duration_ns,
492 iceberg_write_qps,
493 iceberg_write_latency,
494 iceberg_rolling_unflushed_data_file,
495 iceberg_position_delete_cache_num,
496 iceberg_partition_num,
497 iceberg_write_bytes,
498 }
499 }
500}
501
502#[derive(Clone)]
503pub struct SinkWriterParam {
504 pub executor_id: u64,
506 pub vnode_bitmap: Option<Bitmap>,
507 pub meta_client: Option<SinkMetaClient>,
508 pub extra_partition_col_idx: Option<usize>,
513
514 pub actor_id: ActorId,
515 pub sink_id: SinkId,
516 pub sink_name: String,
517 pub connector: String,
518 pub streaming_config: StreamingConfig,
519}
520
521#[derive(Clone)]
522pub struct SinkWriterMetrics {
523 pub sink_commit_duration: LabelGuardedHistogram,
524 pub connector_sink_rows_received: LabelGuardedIntCounter,
525}
526
527impl SinkWriterMetrics {
528 pub fn new(writer_param: &SinkWriterParam) -> Self {
529 let labels = [
530 &writer_param.actor_id.to_string(),
531 writer_param.connector.as_str(),
532 &writer_param.sink_id.to_string(),
533 writer_param.sink_name.as_str(),
534 ];
535 let sink_commit_duration = GLOBAL_SINK_METRICS
536 .sink_commit_duration
537 .with_guarded_label_values(&labels);
538 let connector_sink_rows_received = GLOBAL_SINK_METRICS
539 .connector_sink_rows_received
540 .with_guarded_label_values(&labels);
541 Self {
542 sink_commit_duration,
543 connector_sink_rows_received,
544 }
545 }
546
547 #[cfg(test)]
548 pub fn for_test() -> Self {
549 Self {
550 sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
551 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
552 }
553 }
554}
555
556#[derive(Clone)]
557pub enum SinkMetaClient {
558 MetaClient(MetaClient),
559 MockMetaClient(MockMetaClient),
560}
561
562impl SinkMetaClient {
563 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
564 match self {
565 SinkMetaClient::MetaClient(meta_client) => {
566 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
567 meta_client.sink_coordinate_client().await,
568 )
569 }
570 SinkMetaClient::MockMetaClient(mock_meta_client) => {
571 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
572 mock_meta_client.sink_coordinate_client(),
573 )
574 }
575 }
576 }
577
578 pub async fn add_sink_fail_evet_log(
579 &self,
580 sink_id: u32,
581 sink_name: String,
582 connector: String,
583 error: String,
584 ) {
585 match self {
586 SinkMetaClient::MetaClient(meta_client) => {
587 match meta_client
588 .add_sink_fail_evet(sink_id, sink_name, connector, error)
589 .await
590 {
591 Ok(_) => {}
592 Err(e) => {
593 tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
594 }
595 }
596 }
597 SinkMetaClient::MockMetaClient(_) => {}
598 }
599 }
600}
601
602impl SinkWriterParam {
603 pub fn for_test() -> Self {
604 SinkWriterParam {
605 executor_id: Default::default(),
606 vnode_bitmap: Default::default(),
607 meta_client: Default::default(),
608 extra_partition_col_idx: Default::default(),
609
610 actor_id: 1,
611 sink_id: SinkId::new(1),
612 sink_name: "test_sink".to_owned(),
613 connector: "test_connector".to_owned(),
614 streaming_config: StreamingConfig::default(),
615 }
616 }
617}
618
619fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
620 matches!(
621 sink_name,
622 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK
623 )
624}
625pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
626 const SINK_NAME: &'static str;
627 const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &[];
628 type LogSinker: LogSinker;
629 type Coordinator: SinkCommitCoordinator;
630
631 fn set_default_commit_checkpoint_interval(
632 desc: &mut SinkDesc,
633 user_specified: &SinkDecouple,
634 ) -> Result<()> {
635 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
636 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
637 Some(commit_checkpoint_interval) => {
638 let commit_checkpoint_interval = commit_checkpoint_interval
639 .parse::<u64>()
640 .map_err(|e| SinkError::Config(anyhow!(e)))?;
641 if matches!(user_specified, SinkDecouple::Disable)
642 && commit_checkpoint_interval > 1
643 {
644 return Err(SinkError::Config(anyhow!(
645 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
646 )));
647 }
648 }
649 None => match user_specified {
650 SinkDecouple::Default | SinkDecouple::Enable => {
651 desc.properties.insert(
652 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
653 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
654 );
655 }
656 SinkDecouple::Disable => {
657 desc.properties.insert(
658 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
659 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
660 );
661 }
662 },
663 }
664 }
665 Ok(())
666 }
667
668 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
670 match user_specified {
671 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
672 SinkDecouple::Disable => Ok(false),
673 }
674 }
675
676 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
677 Ok(())
678 }
679
680 async fn validate(&self) -> Result<()>;
681 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
682
683 fn is_coordinated_sink(&self) -> bool {
684 false
685 }
686
687 #[expect(clippy::unused_async)]
688 async fn new_coordinator(
689 &self,
690 _db: DatabaseConnection,
691 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
692 ) -> Result<Self::Coordinator> {
693 Err(SinkError::Coordinator(anyhow!("no coordinator")))
694 }
695}
696
697pub trait SinkLogReader: Send {
698 fn start_from(
699 &mut self,
700 start_offset: Option<u64>,
701 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
702 fn next_item(
706 &mut self,
707 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
708
709 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
712}
713
714impl<R: LogReader> SinkLogReader for &mut R {
715 fn next_item(
716 &mut self,
717 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
718 <R as LogReader>::next_item(*self)
719 }
720
721 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
722 <R as LogReader>::truncate(*self, offset)
723 }
724
725 fn start_from(
726 &mut self,
727 start_offset: Option<u64>,
728 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
729 <R as LogReader>::start_from(*self, start_offset)
730 }
731}
732
733#[async_trait]
734pub trait LogSinker: 'static + Send {
735 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
737}
738pub type SinkCommittedEpochSubscriber = Arc<
739 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
740 + Send
741 + Sync
742 + 'static,
743>;
744
745#[async_trait]
746pub trait SinkCommitCoordinator {
747 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
749 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
754}
755
756pub struct DummySinkCommitCoordinator;
757
758#[async_trait]
759impl SinkCommitCoordinator for DummySinkCommitCoordinator {
760 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
761 Ok(None)
762 }
763
764 async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
765 Ok(())
766 }
767}
768
769impl SinkImpl {
770 pub fn new(mut param: SinkParam) -> Result<Self> {
771 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
772
773 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
775
776 let sink_type = param
777 .properties
778 .get(CONNECTOR_TYPE_KEY)
779 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
780
781 let sink_type = sink_type.to_lowercase();
782 match_sink_name_str!(
783 sink_type.as_str(),
784 SinkType,
785 Ok(SinkType::try_from(param)?.into()),
786 |other| {
787 Err(SinkError::Config(anyhow!(
788 "unsupported sink connector {}",
789 other
790 )))
791 }
792 )
793 }
794
795 pub fn is_sink_into_table(&self) -> bool {
796 matches!(self, SinkImpl::Table(_))
797 }
798
799 pub fn is_blackhole(&self) -> bool {
800 matches!(self, SinkImpl::BlackHole(_))
801 }
802
803 pub fn is_coordinated_sink(&self) -> bool {
804 dispatch_sink!(self, sink, sink.is_coordinated_sink())
805 }
806}
807
808pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
809 SinkImpl::new(param)
810}
811
812macro_rules! def_sink_impl {
813 () => {
814 $crate::for_all_sinks! { def_sink_impl }
815 };
816 ({ $({ $variant_name:ident, $sink_type:ty }),* }) => {
817 #[derive(Debug)]
818 pub enum SinkImpl {
819 $(
820 $variant_name(Box<$sink_type>),
821 )*
822 }
823
824 $(
825 impl From<$sink_type> for SinkImpl {
826 fn from(sink: $sink_type) -> SinkImpl {
827 SinkImpl::$variant_name(Box::new(sink))
828 }
829 }
830 )*
831 };
832}
833
834def_sink_impl!();
835
836pub type Result<T> = std::result::Result<T, SinkError>;
837
838#[derive(Error, Debug)]
839pub enum SinkError {
840 #[error("Kafka error: {0}")]
841 Kafka(#[from] rdkafka::error::KafkaError),
842 #[error("Kinesis error: {0}")]
843 Kinesis(
844 #[source]
845 #[backtrace]
846 anyhow::Error,
847 ),
848 #[error("Remote sink error: {0}")]
849 Remote(
850 #[source]
851 #[backtrace]
852 anyhow::Error,
853 ),
854 #[error("Encode error: {0}")]
855 Encode(String),
856 #[error("Avro error: {0}")]
857 Avro(#[from] apache_avro::Error),
858 #[error("Iceberg error: {0}")]
859 Iceberg(
860 #[source]
861 #[backtrace]
862 anyhow::Error,
863 ),
864 #[error("config error: {0}")]
865 Config(
866 #[source]
867 #[backtrace]
868 anyhow::Error,
869 ),
870 #[error("coordinator error: {0}")]
871 Coordinator(
872 #[source]
873 #[backtrace]
874 anyhow::Error,
875 ),
876 #[error("ClickHouse error: {0}")]
877 ClickHouse(String),
878 #[error("Redis error: {0}")]
879 Redis(String),
880 #[error("Mqtt error: {0}")]
881 Mqtt(
882 #[source]
883 #[backtrace]
884 anyhow::Error,
885 ),
886 #[error("Nats error: {0}")]
887 Nats(
888 #[source]
889 #[backtrace]
890 anyhow::Error,
891 ),
892 #[error("Google Pub/Sub error: {0}")]
893 GooglePubSub(
894 #[source]
895 #[backtrace]
896 anyhow::Error,
897 ),
898 #[error("Doris/Starrocks connect error: {0}")]
899 DorisStarrocksConnect(
900 #[source]
901 #[backtrace]
902 anyhow::Error,
903 ),
904 #[error("Doris error: {0}")]
905 Doris(String),
906 #[error("DeltaLake error: {0}")]
907 DeltaLake(
908 #[source]
909 #[backtrace]
910 anyhow::Error,
911 ),
912 #[error("ElasticSearch/OpenSearch error: {0}")]
913 ElasticSearchOpenSearch(
914 #[source]
915 #[backtrace]
916 anyhow::Error,
917 ),
918 #[error("Starrocks error: {0}")]
919 Starrocks(String),
920 #[error("File error: {0}")]
921 File(String),
922 #[error("Pulsar error: {0}")]
923 Pulsar(
924 #[source]
925 #[backtrace]
926 anyhow::Error,
927 ),
928 #[error(transparent)]
929 Internal(
930 #[from]
931 #[backtrace]
932 anyhow::Error,
933 ),
934 #[error("BigQuery error: {0}")]
935 BigQuery(
936 #[source]
937 #[backtrace]
938 anyhow::Error,
939 ),
940 #[error("DynamoDB error: {0}")]
941 DynamoDb(
942 #[source]
943 #[backtrace]
944 anyhow::Error,
945 ),
946 #[error("SQL Server error: {0}")]
947 SqlServer(
948 #[source]
949 #[backtrace]
950 anyhow::Error,
951 ),
952 #[error("Postgres error: {0}")]
953 Postgres(
954 #[source]
955 #[backtrace]
956 anyhow::Error,
957 ),
958 #[error(transparent)]
959 Connector(
960 #[from]
961 #[backtrace]
962 ConnectorError,
963 ),
964 #[error("Secret error: {0}")]
965 Secret(
966 #[from]
967 #[backtrace]
968 SecretError,
969 ),
970 #[error("Mongodb error: {0}")]
971 Mongodb(
972 #[source]
973 #[backtrace]
974 anyhow::Error,
975 ),
976}
977
978impl From<sea_orm::DbErr> for SinkError {
979 fn from(err: sea_orm::DbErr) -> Self {
980 SinkError::Iceberg(anyhow!(err))
981 }
982}
983
984impl From<OpendalError> for SinkError {
985 fn from(error: OpendalError) -> Self {
986 SinkError::File(error.to_report_string())
987 }
988}
989
990impl From<parquet::errors::ParquetError> for SinkError {
991 fn from(error: parquet::errors::ParquetError) -> Self {
992 SinkError::File(error.to_report_string())
993 }
994}
995
996impl From<ArrayError> for SinkError {
997 fn from(error: ArrayError) -> Self {
998 SinkError::File(error.to_report_string())
999 }
1000}
1001
1002impl From<RpcError> for SinkError {
1003 fn from(value: RpcError) -> Self {
1004 SinkError::Remote(anyhow!(value))
1005 }
1006}
1007
1008impl From<ClickHouseError> for SinkError {
1009 fn from(value: ClickHouseError) -> Self {
1010 SinkError::ClickHouse(value.to_report_string())
1011 }
1012}
1013
1014#[cfg(feature = "sink-deltalake")]
1015impl From<::deltalake::DeltaTableError> for SinkError {
1016 fn from(value: ::deltalake::DeltaTableError) -> Self {
1017 SinkError::DeltaLake(anyhow!(value))
1018 }
1019}
1020
1021impl From<RedisError> for SinkError {
1022 fn from(value: RedisError) -> Self {
1023 SinkError::Redis(value.to_report_string())
1024 }
1025}
1026
1027impl From<tiberius::error::Error> for SinkError {
1028 fn from(err: tiberius::error::Error) -> Self {
1029 SinkError::SqlServer(anyhow!(err))
1030 }
1031}
1032
1033impl From<::elasticsearch::Error> for SinkError {
1034 fn from(err: ::elasticsearch::Error) -> Self {
1035 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1036 }
1037}
1038
1039impl From<::opensearch::Error> for SinkError {
1040 fn from(err: ::opensearch::Error) -> Self {
1041 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1042 }
1043}
1044
1045impl From<tokio_postgres::Error> for SinkError {
1046 fn from(err: tokio_postgres::Error) -> Self {
1047 SinkError::Postgres(anyhow!(err))
1048 }
1049}