1pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32use risingwave_common::bail;
33pub mod log_store;
34pub mod mock_coordination_client;
35pub mod mongodb;
36pub mod mqtt;
37pub mod nats;
38pub mod postgres;
39pub mod pulsar;
40pub mod redis;
41pub mod remote;
42pub mod sqlserver;
43pub mod starrocks;
44pub mod test_sink;
45pub mod trivial;
46pub mod utils;
47pub mod writer;
48
49use std::collections::BTreeMap;
50use std::future::Future;
51use std::sync::{Arc, LazyLock};
52
53use ::clickhouse::error::Error as ClickHouseError;
54use ::deltalake::DeltaTableError;
55use ::redis::RedisError;
56use anyhow::anyhow;
57use async_trait::async_trait;
58use clickhouse::CLICKHOUSE_SINK;
59use decouple_checkpoint_log_sink::{
60 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
61 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
62};
63use deltalake::DELTALAKE_SINK;
64use futures::future::BoxFuture;
65use iceberg::ICEBERG_SINK;
66use opendal::Error as OpendalError;
67use prometheus::Registry;
68use risingwave_common::array::ArrayError;
69use risingwave_common::bitmap::Bitmap;
70use risingwave_common::catalog::{ColumnDesc, Field, Schema};
71use risingwave_common::config::StreamingConfig;
72use risingwave_common::hash::ActorId;
73use risingwave_common::metrics::{
74 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
75 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
76};
77use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
78use risingwave_common::secret::{LocalSecretManager, SecretError};
79use risingwave_common::session_config::sink_decouple::SinkDecouple;
80use risingwave_common::{
81 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
82 register_guarded_int_gauge_vec_with_registry,
83};
84use risingwave_pb::catalog::PbSinkType;
85use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
86use risingwave_rpc_client::MetaClient;
87use risingwave_rpc_client::error::RpcError;
88use sea_orm::DatabaseConnection;
89use starrocks::STARROCKS_SINK;
90use thiserror::Error;
91use thiserror_ext::AsReport;
92use tokio::sync::mpsc::UnboundedReceiver;
93pub use tracing;
94
95use self::catalog::{SinkFormatDesc, SinkType};
96use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
97use crate::WithPropertiesExt;
98use crate::error::{ConnectorError, ConnectorResult};
99use crate::sink::catalog::desc::SinkDesc;
100use crate::sink::catalog::{SinkCatalog, SinkId};
101use crate::sink::file_sink::fs::FsSink;
102use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
103use crate::sink::writer::SinkWriter;
104
105const BOUNDED_CHANNEL_SIZE: usize = 16;
106#[macro_export]
107macro_rules! for_all_sinks {
108 ($macro:path $(, $arg:tt)*) => {
109 $macro! {
110 {
111 { Redis, $crate::sink::redis::RedisSink },
112 { Kafka, $crate::sink::kafka::KafkaSink },
113 { Pulsar, $crate::sink::pulsar::PulsarSink },
114 { BlackHole, $crate::sink::trivial::BlackHoleSink },
115 { Kinesis, $crate::sink::kinesis::KinesisSink },
116 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink },
117 { Iceberg, $crate::sink::iceberg::IcebergSink },
118 { Mqtt, $crate::sink::mqtt::MqttSink },
119 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink },
120 { Nats, $crate::sink::nats::NatsSink },
121 { Jdbc, $crate::sink::remote::JdbcSink },
122 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink },
125 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink },
126 { Cassandra, $crate::sink::remote::CassandraSink },
127 { Doris, $crate::sink::doris::DorisSink },
128 { Starrocks, $crate::sink::starrocks::StarrocksSink },
129 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>},
130
131 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink> },
132 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>},
133 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>},
134
135 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink> },
136 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>},
137 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
138 { BigQuery, $crate::sink::big_query::BigQuerySink },
139 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink },
140 { Mongodb, $crate::sink::mongodb::MongodbSink },
141 { SqlServer, $crate::sink::sqlserver::SqlServerSink },
142 { Postgres, $crate::sink::postgres::PostgresSink },
143
144 { Test, $crate::sink::test_sink::TestSink },
145 { Table, $crate::sink::trivial::TableSink }
146 }
147 $(,$arg)*
148 }
149 };
150}
151
152#[macro_export]
153macro_rules! dispatch_sink {
154 ({$({$variant_name:ident, $sink_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
155 use $crate::sink::SinkImpl;
156
157 match $impl {
158 $(
159 SinkImpl::$variant_name($sink) => $body,
160 )*
161 }
162 }};
163 ($impl:expr, $sink:ident, $body:expr) => {{
164 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
165 }};
166}
167
168#[macro_export]
169macro_rules! match_sink_name_str {
170 ({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
171 use $crate::sink::Sink;
172 match $name_str {
173 $(
174 <$sink_type>::SINK_NAME => {
175 type $type_name = $sink_type;
176 {
177 $body
178 }
179 },
180 )*
181 other => ($on_other_closure)(other),
182 }
183 }};
184 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
185 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
186 }};
187}
188
189pub const CONNECTOR_TYPE_KEY: &str = "connector";
190pub const SINK_TYPE_OPTION: &str = "type";
191pub const SINK_WITHOUT_BACKFILL: &str = "snapshot";
192pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
193pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
194pub const SINK_TYPE_UPSERT: &str = "upsert";
195pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
196
197#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct SinkParam {
199 pub sink_id: SinkId,
200 pub sink_name: String,
201 pub properties: BTreeMap<String, String>,
202 pub columns: Vec<ColumnDesc>,
203 pub downstream_pk: Vec<usize>,
204 pub sink_type: SinkType,
205 pub format_desc: Option<SinkFormatDesc>,
206 pub db_name: String,
207
208 pub sink_from_name: String,
214}
215
216impl SinkParam {
217 pub fn from_proto(pb_param: PbSinkParam) -> Self {
218 let table_schema = pb_param.table_schema.expect("should contain table schema");
219 let format_desc = match pb_param.format_desc {
220 Some(f) => f.try_into().ok(),
221 None => {
222 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
223 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
224 match (connector, r#type) {
225 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
226 _ => None,
227 }
228 }
229 };
230 Self {
231 sink_id: SinkId::from(pb_param.sink_id),
232 sink_name: pb_param.sink_name,
233 properties: pb_param.properties,
234 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
235 downstream_pk: table_schema
236 .pk_indices
237 .iter()
238 .map(|i| *i as usize)
239 .collect(),
240 sink_type: SinkType::from_proto(
241 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
242 ),
243 format_desc,
244 db_name: pb_param.db_name,
245 sink_from_name: pb_param.sink_from_name,
246 }
247 }
248
249 pub fn to_proto(&self) -> PbSinkParam {
250 PbSinkParam {
251 sink_id: self.sink_id.sink_id,
252 sink_name: self.sink_name.clone(),
253 properties: self.properties.clone(),
254 table_schema: Some(TableSchema {
255 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
256 pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
257 }),
258 sink_type: self.sink_type.to_proto().into(),
259 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
260 db_name: self.db_name.clone(),
261 sink_from_name: self.sink_from_name.clone(),
262 }
263 }
264
265 pub fn schema(&self) -> Schema {
266 Schema {
267 fields: self.columns.iter().map(Field::from).collect(),
268 }
269 }
270
271 pub fn fill_secret_for_format_desc(
274 format_desc: Option<SinkFormatDesc>,
275 ) -> Result<Option<SinkFormatDesc>> {
276 match format_desc {
277 Some(mut format_desc) => {
278 format_desc.options = LocalSecretManager::global()
279 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
280 Ok(Some(format_desc))
281 }
282 None => Ok(None),
283 }
284 }
285
286 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
288 let columns = sink_catalog
289 .visible_columns()
290 .map(|col| col.column_desc.clone())
291 .collect();
292 let properties_with_secret = LocalSecretManager::global()
293 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
294 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
295 Ok(Self {
296 sink_id: sink_catalog.id,
297 sink_name: sink_catalog.name,
298 properties: properties_with_secret,
299 columns,
300 downstream_pk: sink_catalog.downstream_pk,
301 sink_type: sink_catalog.sink_type,
302 format_desc: format_desc_with_secret,
303 db_name: sink_catalog.db_name,
304 sink_from_name: sink_catalog.sink_from_name,
305 })
306 }
307}
308
309pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
310 use crate::enforce_secret::EnforceSecret;
311
312 let connector = props
313 .get_connector()
314 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
315 let key_iter = props.key_iter();
316 match_sink_name_str!(
317 connector.as_str(),
318 PropType,
319 PropType::enforce_secret(key_iter),
320 |other| bail!("connector '{}' is not supported", other)
321 )
322}
323
324pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
325 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
326
327#[derive(Clone)]
328pub struct SinkMetrics {
329 pub sink_commit_duration: LabelGuardedHistogramVec,
330 pub connector_sink_rows_received: LabelGuardedIntCounterVec,
331
332 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
334 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
335 pub log_store_write_rows: LabelGuardedIntCounterVec,
336
337 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
339 pub log_store_read_rows: LabelGuardedIntCounterVec,
340 pub log_store_read_bytes: LabelGuardedIntCounterVec,
341 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
342
343 pub iceberg_write_qps: LabelGuardedIntCounterVec,
345 pub iceberg_write_latency: LabelGuardedHistogramVec,
346 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
347 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
348 pub iceberg_partition_num: LabelGuardedIntGaugeVec,
349 pub iceberg_write_bytes: LabelGuardedIntCounterVec,
350}
351
352impl SinkMetrics {
353 pub fn new(registry: &Registry) -> Self {
354 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
355 "sink_commit_duration",
356 "Duration of commit op in sink",
357 &["actor_id", "connector", "sink_id", "sink_name"],
358 registry
359 )
360 .unwrap();
361
362 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
363 "connector_sink_rows_received",
364 "Number of rows received by sink",
365 &["actor_id", "connector_type", "sink_id", "sink_name"],
366 registry
367 )
368 .unwrap();
369
370 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
371 "log_store_first_write_epoch",
372 "The first write epoch of log store",
373 &["actor_id", "sink_id", "sink_name"],
374 registry
375 )
376 .unwrap();
377
378 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
379 "log_store_latest_write_epoch",
380 "The latest write epoch of log store",
381 &["actor_id", "sink_id", "sink_name"],
382 registry
383 )
384 .unwrap();
385
386 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
387 "log_store_write_rows",
388 "The write rate of rows",
389 &["actor_id", "sink_id", "sink_name"],
390 registry
391 )
392 .unwrap();
393
394 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
395 "log_store_latest_read_epoch",
396 "The latest read epoch of log store",
397 &["actor_id", "connector", "sink_id", "sink_name"],
398 registry
399 )
400 .unwrap();
401
402 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
403 "log_store_read_rows",
404 "The read rate of rows",
405 &["actor_id", "connector", "sink_id", "sink_name"],
406 registry
407 )
408 .unwrap();
409
410 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
411 "log_store_read_bytes",
412 "Total size of chunks read by log reader",
413 &["actor_id", "connector", "sink_id", "sink_name"],
414 registry
415 )
416 .unwrap();
417
418 let log_store_reader_wait_new_future_duration_ns =
419 register_guarded_int_counter_vec_with_registry!(
420 "log_store_reader_wait_new_future_duration_ns",
421 "Accumulated duration of LogReader to wait for next call to create future",
422 &["actor_id", "connector", "sink_id", "sink_name"],
423 registry
424 )
425 .unwrap();
426
427 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
428 "iceberg_write_qps",
429 "The qps of iceberg writer",
430 &["actor_id", "sink_id", "sink_name"],
431 registry
432 )
433 .unwrap();
434
435 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
436 "iceberg_write_latency",
437 "The latency of iceberg writer",
438 &["actor_id", "sink_id", "sink_name"],
439 registry
440 )
441 .unwrap();
442
443 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
444 "iceberg_rolling_unflushed_data_file",
445 "The unflushed data file count of iceberg rolling writer",
446 &["actor_id", "sink_id", "sink_name"],
447 registry
448 )
449 .unwrap();
450
451 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
452 "iceberg_position_delete_cache_num",
453 "The delete cache num of iceberg position delete writer",
454 &["actor_id", "sink_id", "sink_name"],
455 registry
456 )
457 .unwrap();
458
459 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
460 "iceberg_partition_num",
461 "The partition num of iceberg partition writer",
462 &["actor_id", "sink_id", "sink_name"],
463 registry
464 )
465 .unwrap();
466
467 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
468 "iceberg_write_bytes",
469 "The write bytes of iceberg writer",
470 &["actor_id", "sink_id", "sink_name"],
471 registry
472 )
473 .unwrap();
474
475 Self {
476 sink_commit_duration,
477 connector_sink_rows_received,
478 log_store_first_write_epoch,
479 log_store_latest_write_epoch,
480 log_store_write_rows,
481 log_store_latest_read_epoch,
482 log_store_read_rows,
483 log_store_read_bytes,
484 log_store_reader_wait_new_future_duration_ns,
485 iceberg_write_qps,
486 iceberg_write_latency,
487 iceberg_rolling_unflushed_data_file,
488 iceberg_position_delete_cache_num,
489 iceberg_partition_num,
490 iceberg_write_bytes,
491 }
492 }
493}
494
495#[derive(Clone)]
496pub struct SinkWriterParam {
497 pub executor_id: u64,
499 pub vnode_bitmap: Option<Bitmap>,
500 pub meta_client: Option<SinkMetaClient>,
501 pub extra_partition_col_idx: Option<usize>,
506
507 pub actor_id: ActorId,
508 pub sink_id: SinkId,
509 pub sink_name: String,
510 pub connector: String,
511 pub streaming_config: StreamingConfig,
512}
513
514#[derive(Clone)]
515pub struct SinkWriterMetrics {
516 pub sink_commit_duration: LabelGuardedHistogram,
517 pub connector_sink_rows_received: LabelGuardedIntCounter,
518}
519
520impl SinkWriterMetrics {
521 pub fn new(writer_param: &SinkWriterParam) -> Self {
522 let labels = [
523 &writer_param.actor_id.to_string(),
524 writer_param.connector.as_str(),
525 &writer_param.sink_id.to_string(),
526 writer_param.sink_name.as_str(),
527 ];
528 let sink_commit_duration = GLOBAL_SINK_METRICS
529 .sink_commit_duration
530 .with_guarded_label_values(&labels);
531 let connector_sink_rows_received = GLOBAL_SINK_METRICS
532 .connector_sink_rows_received
533 .with_guarded_label_values(&labels);
534 Self {
535 sink_commit_duration,
536 connector_sink_rows_received,
537 }
538 }
539
540 #[cfg(test)]
541 pub fn for_test() -> Self {
542 Self {
543 sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
544 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
545 }
546 }
547}
548
549#[derive(Clone)]
550pub enum SinkMetaClient {
551 MetaClient(MetaClient),
552 MockMetaClient(MockMetaClient),
553}
554
555impl SinkMetaClient {
556 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
557 match self {
558 SinkMetaClient::MetaClient(meta_client) => {
559 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
560 meta_client.sink_coordinate_client().await,
561 )
562 }
563 SinkMetaClient::MockMetaClient(mock_meta_client) => {
564 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
565 mock_meta_client.sink_coordinate_client(),
566 )
567 }
568 }
569 }
570
571 pub async fn add_sink_fail_evet_log(
572 &self,
573 sink_id: u32,
574 sink_name: String,
575 connector: String,
576 error: String,
577 ) {
578 match self {
579 SinkMetaClient::MetaClient(meta_client) => {
580 match meta_client
581 .add_sink_fail_evet(sink_id, sink_name, connector, error)
582 .await
583 {
584 Ok(_) => {}
585 Err(e) => {
586 tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
587 }
588 }
589 }
590 SinkMetaClient::MockMetaClient(_) => {}
591 }
592 }
593}
594
595impl SinkWriterParam {
596 pub fn for_test() -> Self {
597 SinkWriterParam {
598 executor_id: Default::default(),
599 vnode_bitmap: Default::default(),
600 meta_client: Default::default(),
601 extra_partition_col_idx: Default::default(),
602
603 actor_id: 1,
604 sink_id: SinkId::new(1),
605 sink_name: "test_sink".to_owned(),
606 connector: "test_connector".to_owned(),
607 streaming_config: StreamingConfig::default(),
608 }
609 }
610}
611
612fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
613 matches!(
614 sink_name,
615 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK
616 )
617}
618pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
619 const SINK_NAME: &'static str;
620 const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &[];
621 type LogSinker: LogSinker;
622 type Coordinator: SinkCommitCoordinator;
623
624 fn set_default_commit_checkpoint_interval(
625 desc: &mut SinkDesc,
626 user_specified: &SinkDecouple,
627 ) -> Result<()> {
628 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
629 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
630 Some(commit_checkpoint_interval) => {
631 let commit_checkpoint_interval = commit_checkpoint_interval
632 .parse::<u64>()
633 .map_err(|e| SinkError::Config(anyhow!(e)))?;
634 if matches!(user_specified, SinkDecouple::Disable)
635 && commit_checkpoint_interval > 1
636 {
637 return Err(SinkError::Config(anyhow!(
638 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
639 )));
640 }
641 }
642 None => match user_specified {
643 SinkDecouple::Default | SinkDecouple::Enable => {
644 desc.properties.insert(
645 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
646 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
647 );
648 }
649 SinkDecouple::Disable => {
650 desc.properties.insert(
651 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
652 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
653 );
654 }
655 },
656 }
657 }
658 Ok(())
659 }
660
661 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
663 match user_specified {
664 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
665 SinkDecouple::Disable => Ok(false),
666 }
667 }
668
669 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
670 Ok(())
671 }
672
673 async fn validate(&self) -> Result<()>;
674 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
675
676 fn is_coordinated_sink(&self) -> bool {
677 false
678 }
679
680 #[expect(clippy::unused_async)]
681 async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
682 Err(SinkError::Coordinator(anyhow!("no coordinator")))
683 }
684}
685
686pub trait SinkLogReader: Send {
687 fn start_from(
688 &mut self,
689 start_offset: Option<u64>,
690 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
691 fn next_item(
695 &mut self,
696 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
697
698 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
701}
702
703impl<R: LogReader> SinkLogReader for &mut R {
704 fn next_item(
705 &mut self,
706 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
707 <R as LogReader>::next_item(*self)
708 }
709
710 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
711 <R as LogReader>::truncate(*self, offset)
712 }
713
714 fn start_from(
715 &mut self,
716 start_offset: Option<u64>,
717 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
718 <R as LogReader>::start_from(*self, start_offset)
719 }
720}
721
722#[async_trait]
723pub trait LogSinker: 'static + Send {
724 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
726}
727pub type SinkCommittedEpochSubscriber = Arc<
728 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
729 + Send
730 + Sync
731 + 'static,
732>;
733
734#[async_trait]
735pub trait SinkCommitCoordinator {
736 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
738 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
743}
744
745pub struct DummySinkCommitCoordinator;
746
747#[async_trait]
748impl SinkCommitCoordinator for DummySinkCommitCoordinator {
749 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
750 Ok(None)
751 }
752
753 async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
754 Ok(())
755 }
756}
757
758impl SinkImpl {
759 pub fn new(mut param: SinkParam) -> Result<Self> {
760 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
761
762 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
764
765 let sink_type = param
766 .properties
767 .get(CONNECTOR_TYPE_KEY)
768 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
769
770 let sink_type = sink_type.to_lowercase();
771 match_sink_name_str!(
772 sink_type.as_str(),
773 SinkType,
774 Ok(SinkType::try_from(param)?.into()),
775 |other| {
776 Err(SinkError::Config(anyhow!(
777 "unsupported sink connector {}",
778 other
779 )))
780 }
781 )
782 }
783
784 pub fn is_sink_into_table(&self) -> bool {
785 matches!(self, SinkImpl::Table(_))
786 }
787
788 pub fn is_blackhole(&self) -> bool {
789 matches!(self, SinkImpl::BlackHole(_))
790 }
791
792 pub fn is_coordinated_sink(&self) -> bool {
793 dispatch_sink!(self, sink, sink.is_coordinated_sink())
794 }
795}
796
797pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
798 SinkImpl::new(param)
799}
800
801macro_rules! def_sink_impl {
802 () => {
803 $crate::for_all_sinks! { def_sink_impl }
804 };
805 ({ $({ $variant_name:ident, $sink_type:ty }),* }) => {
806 #[derive(Debug)]
807 pub enum SinkImpl {
808 $(
809 $variant_name(Box<$sink_type>),
810 )*
811 }
812
813 $(
814 impl From<$sink_type> for SinkImpl {
815 fn from(sink: $sink_type) -> SinkImpl {
816 SinkImpl::$variant_name(Box::new(sink))
817 }
818 }
819 )*
820 };
821}
822
823def_sink_impl!();
824
825pub type Result<T> = std::result::Result<T, SinkError>;
826
827#[derive(Error, Debug)]
828pub enum SinkError {
829 #[error("Kafka error: {0}")]
830 Kafka(#[from] rdkafka::error::KafkaError),
831 #[error("Kinesis error: {0}")]
832 Kinesis(
833 #[source]
834 #[backtrace]
835 anyhow::Error,
836 ),
837 #[error("Remote sink error: {0}")]
838 Remote(
839 #[source]
840 #[backtrace]
841 anyhow::Error,
842 ),
843 #[error("Encode error: {0}")]
844 Encode(String),
845 #[error("Avro error: {0}")]
846 Avro(#[from] apache_avro::Error),
847 #[error("Iceberg error: {0}")]
848 Iceberg(
849 #[source]
850 #[backtrace]
851 anyhow::Error,
852 ),
853 #[error("config error: {0}")]
854 Config(
855 #[source]
856 #[backtrace]
857 anyhow::Error,
858 ),
859 #[error("coordinator error: {0}")]
860 Coordinator(
861 #[source]
862 #[backtrace]
863 anyhow::Error,
864 ),
865 #[error("ClickHouse error: {0}")]
866 ClickHouse(String),
867 #[error("Redis error: {0}")]
868 Redis(String),
869 #[error("Mqtt error: {0}")]
870 Mqtt(
871 #[source]
872 #[backtrace]
873 anyhow::Error,
874 ),
875 #[error("Nats error: {0}")]
876 Nats(
877 #[source]
878 #[backtrace]
879 anyhow::Error,
880 ),
881 #[error("Google Pub/Sub error: {0}")]
882 GooglePubSub(
883 #[source]
884 #[backtrace]
885 anyhow::Error,
886 ),
887 #[error("Doris/Starrocks connect error: {0}")]
888 DorisStarrocksConnect(
889 #[source]
890 #[backtrace]
891 anyhow::Error,
892 ),
893 #[error("Doris error: {0}")]
894 Doris(String),
895 #[error("DeltaLake error: {0}")]
896 DeltaLake(
897 #[source]
898 #[backtrace]
899 anyhow::Error,
900 ),
901 #[error("ElasticSearch/OpenSearch error: {0}")]
902 ElasticSearchOpenSearch(
903 #[source]
904 #[backtrace]
905 anyhow::Error,
906 ),
907 #[error("Starrocks error: {0}")]
908 Starrocks(String),
909 #[error("File error: {0}")]
910 File(String),
911 #[error("Pulsar error: {0}")]
912 Pulsar(
913 #[source]
914 #[backtrace]
915 anyhow::Error,
916 ),
917 #[error(transparent)]
918 Internal(
919 #[from]
920 #[backtrace]
921 anyhow::Error,
922 ),
923 #[error("BigQuery error: {0}")]
924 BigQuery(
925 #[source]
926 #[backtrace]
927 anyhow::Error,
928 ),
929 #[error("DynamoDB error: {0}")]
930 DynamoDb(
931 #[source]
932 #[backtrace]
933 anyhow::Error,
934 ),
935 #[error("SQL Server error: {0}")]
936 SqlServer(
937 #[source]
938 #[backtrace]
939 anyhow::Error,
940 ),
941 #[error("Postgres error: {0}")]
942 Postgres(
943 #[source]
944 #[backtrace]
945 anyhow::Error,
946 ),
947 #[error(transparent)]
948 Connector(
949 #[from]
950 #[backtrace]
951 ConnectorError,
952 ),
953 #[error("Secret error: {0}")]
954 Secret(
955 #[from]
956 #[backtrace]
957 SecretError,
958 ),
959 #[error("Mongodb error: {0}")]
960 Mongodb(
961 #[source]
962 #[backtrace]
963 anyhow::Error,
964 ),
965}
966
967impl From<sea_orm::DbErr> for SinkError {
968 fn from(err: sea_orm::DbErr) -> Self {
969 SinkError::Iceberg(anyhow!(err))
970 }
971}
972
973impl From<OpendalError> for SinkError {
974 fn from(error: OpendalError) -> Self {
975 SinkError::File(error.to_report_string())
976 }
977}
978
979impl From<parquet::errors::ParquetError> for SinkError {
980 fn from(error: parquet::errors::ParquetError) -> Self {
981 SinkError::File(error.to_report_string())
982 }
983}
984
985impl From<ArrayError> for SinkError {
986 fn from(error: ArrayError) -> Self {
987 SinkError::File(error.to_report_string())
988 }
989}
990
991impl From<RpcError> for SinkError {
992 fn from(value: RpcError) -> Self {
993 SinkError::Remote(anyhow!(value))
994 }
995}
996
997impl From<ClickHouseError> for SinkError {
998 fn from(value: ClickHouseError) -> Self {
999 SinkError::ClickHouse(value.to_report_string())
1000 }
1001}
1002
1003impl From<DeltaTableError> for SinkError {
1004 fn from(value: DeltaTableError) -> Self {
1005 SinkError::DeltaLake(anyhow!(value))
1006 }
1007}
1008
1009impl From<RedisError> for SinkError {
1010 fn from(value: RedisError) -> Self {
1011 SinkError::Redis(value.to_report_string())
1012 }
1013}
1014
1015impl From<tiberius::error::Error> for SinkError {
1016 fn from(err: tiberius::error::Error) -> Self {
1017 SinkError::SqlServer(anyhow!(err))
1018 }
1019}
1020
1021impl From<::elasticsearch::Error> for SinkError {
1022 fn from(err: ::elasticsearch::Error) -> Self {
1023 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1024 }
1025}
1026
1027impl From<::opensearch::Error> for SinkError {
1028 fn from(err: ::opensearch::Error) -> Self {
1029 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1030 }
1031}
1032
1033impl From<tokio_postgres::Error> for SinkError {
1034 fn from(err: tokio_postgres::Error) -> Self {
1035 SinkError::Postgres(anyhow!(err))
1036 }
1037}