1pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32pub mod log_store;
33pub mod mock_coordination_client;
34pub mod mongodb;
35pub mod mqtt;
36pub mod nats;
37pub mod postgres;
38pub mod pulsar;
39pub mod redis;
40pub mod remote;
41pub mod sqlserver;
42pub mod starrocks;
43pub mod test_sink;
44pub mod trivial;
45pub mod utils;
46pub mod writer;
47
48use std::collections::BTreeMap;
49use std::future::Future;
50use std::sync::{Arc, LazyLock};
51
52use ::clickhouse::error::Error as ClickHouseError;
53use ::deltalake::DeltaTableError;
54use ::redis::RedisError;
55use anyhow::anyhow;
56use async_trait::async_trait;
57use clickhouse::CLICKHOUSE_SINK;
58use decouple_checkpoint_log_sink::{
59 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
60 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
61};
62use deltalake::DELTALAKE_SINK;
63use futures::future::BoxFuture;
64use iceberg::ICEBERG_SINK;
65use opendal::Error as OpendalError;
66use prometheus::Registry;
67use risingwave_common::array::ArrayError;
68use risingwave_common::bitmap::Bitmap;
69use risingwave_common::catalog::{ColumnDesc, Field, Schema};
70use risingwave_common::config::StreamingConfig;
71use risingwave_common::hash::ActorId;
72use risingwave_common::metrics::{
73 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
74 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
75};
76use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
77use risingwave_common::secret::{LocalSecretManager, SecretError};
78use risingwave_common::session_config::sink_decouple::SinkDecouple;
79use risingwave_common::{
80 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
81 register_guarded_int_gauge_vec_with_registry,
82};
83use risingwave_pb::catalog::PbSinkType;
84use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
85use risingwave_rpc_client::MetaClient;
86use risingwave_rpc_client::error::RpcError;
87use sea_orm::DatabaseConnection;
88use starrocks::STARROCKS_SINK;
89use thiserror::Error;
90use thiserror_ext::AsReport;
91use tokio::sync::mpsc::UnboundedReceiver;
92pub use tracing;
93
94use self::catalog::{SinkFormatDesc, SinkType};
95use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
96use crate::error::ConnectorError;
97use crate::sink::catalog::desc::SinkDesc;
98use crate::sink::catalog::{SinkCatalog, SinkId};
99use crate::sink::file_sink::fs::FsSink;
100use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
101use crate::sink::writer::SinkWriter;
102
103const BOUNDED_CHANNEL_SIZE: usize = 16;
104#[macro_export]
105macro_rules! for_all_sinks {
106 ($macro:path $(, $arg:tt)*) => {
107 $macro! {
108 {
109 { Redis, $crate::sink::redis::RedisSink },
110 { Kafka, $crate::sink::kafka::KafkaSink },
111 { Pulsar, $crate::sink::pulsar::PulsarSink },
112 { BlackHole, $crate::sink::trivial::BlackHoleSink },
113 { Kinesis, $crate::sink::kinesis::KinesisSink },
114 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink },
115 { Iceberg, $crate::sink::iceberg::IcebergSink },
116 { Mqtt, $crate::sink::mqtt::MqttSink },
117 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink },
118 { Nats, $crate::sink::nats::NatsSink },
119 { Jdbc, $crate::sink::remote::JdbcSink },
120 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink },
123 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink },
124 { Cassandra, $crate::sink::remote::CassandraSink },
125 { HttpJava, $crate::sink::remote::HttpJavaSink },
126 { Doris, $crate::sink::doris::DorisSink },
127 { Starrocks, $crate::sink::starrocks::StarrocksSink },
128 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>},
129
130 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink> },
131 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>},
132 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>},
133
134 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink> },
135 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>},
136 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
137 { BigQuery, $crate::sink::big_query::BigQuerySink },
138 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink },
139 { Mongodb, $crate::sink::mongodb::MongodbSink },
140 { SqlServer, $crate::sink::sqlserver::SqlServerSink },
141 { Postgres, $crate::sink::postgres::PostgresSink },
142
143 { Test, $crate::sink::test_sink::TestSink },
144 { Table, $crate::sink::trivial::TableSink }
145 }
146 $(,$arg)*
147 }
148 };
149}
150
151#[macro_export]
152macro_rules! dispatch_sink {
153 ({$({$variant_name:ident, $sink_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
154 use $crate::sink::SinkImpl;
155
156 match $impl {
157 $(
158 SinkImpl::$variant_name($sink) => $body,
159 )*
160 }
161 }};
162 ($impl:expr, $sink:ident, $body:expr) => {{
163 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
164 }};
165}
166
167#[macro_export]
168macro_rules! match_sink_name_str {
169 ({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
170 use $crate::sink::Sink;
171 match $name_str {
172 $(
173 <$sink_type>::SINK_NAME => {
174 type $type_name = $sink_type;
175 {
176 $body
177 }
178 },
179 )*
180 other => ($on_other_closure)(other),
181 }
182 }};
183 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
184 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
185 }};
186}
187
188pub const CONNECTOR_TYPE_KEY: &str = "connector";
189pub const SINK_TYPE_OPTION: &str = "type";
190pub const SINK_WITHOUT_BACKFILL: &str = "snapshot";
191pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
192pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
193pub const SINK_TYPE_UPSERT: &str = "upsert";
194pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
195
196#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct SinkParam {
198 pub sink_id: SinkId,
199 pub sink_name: String,
200 pub properties: BTreeMap<String, String>,
201 pub columns: Vec<ColumnDesc>,
202 pub downstream_pk: Vec<usize>,
203 pub sink_type: SinkType,
204 pub format_desc: Option<SinkFormatDesc>,
205 pub db_name: String,
206
207 pub sink_from_name: String,
213}
214
215impl SinkParam {
216 pub fn from_proto(pb_param: PbSinkParam) -> Self {
217 let table_schema = pb_param.table_schema.expect("should contain table schema");
218 let format_desc = match pb_param.format_desc {
219 Some(f) => f.try_into().ok(),
220 None => {
221 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
222 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
223 match (connector, r#type) {
224 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
225 _ => None,
226 }
227 }
228 };
229 Self {
230 sink_id: SinkId::from(pb_param.sink_id),
231 sink_name: pb_param.sink_name,
232 properties: pb_param.properties,
233 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
234 downstream_pk: table_schema
235 .pk_indices
236 .iter()
237 .map(|i| *i as usize)
238 .collect(),
239 sink_type: SinkType::from_proto(
240 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
241 ),
242 format_desc,
243 db_name: pb_param.db_name,
244 sink_from_name: pb_param.sink_from_name,
245 }
246 }
247
248 pub fn to_proto(&self) -> PbSinkParam {
249 PbSinkParam {
250 sink_id: self.sink_id.sink_id,
251 sink_name: self.sink_name.clone(),
252 properties: self.properties.clone(),
253 table_schema: Some(TableSchema {
254 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
255 pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
256 }),
257 sink_type: self.sink_type.to_proto().into(),
258 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
259 db_name: self.db_name.clone(),
260 sink_from_name: self.sink_from_name.clone(),
261 }
262 }
263
264 pub fn schema(&self) -> Schema {
265 Schema {
266 fields: self.columns.iter().map(Field::from).collect(),
267 }
268 }
269
270 pub fn fill_secret_for_format_desc(
273 format_desc: Option<SinkFormatDesc>,
274 ) -> Result<Option<SinkFormatDesc>> {
275 match format_desc {
276 Some(mut format_desc) => {
277 format_desc.options = LocalSecretManager::global()
278 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
279 Ok(Some(format_desc))
280 }
281 None => Ok(None),
282 }
283 }
284
285 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
287 let columns = sink_catalog
288 .visible_columns()
289 .map(|col| col.column_desc.clone())
290 .collect();
291 let properties_with_secret = LocalSecretManager::global()
292 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
293 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
294 Ok(Self {
295 sink_id: sink_catalog.id,
296 sink_name: sink_catalog.name,
297 properties: properties_with_secret,
298 columns,
299 downstream_pk: sink_catalog.downstream_pk,
300 sink_type: sink_catalog.sink_type,
301 format_desc: format_desc_with_secret,
302 db_name: sink_catalog.db_name,
303 sink_from_name: sink_catalog.sink_from_name,
304 })
305 }
306}
307
308pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
309 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
310
311#[derive(Clone)]
312pub struct SinkMetrics {
313 pub sink_commit_duration: LabelGuardedHistogramVec<4>,
314 pub connector_sink_rows_received: LabelGuardedIntCounterVec<4>,
315
316 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<3>,
318 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<3>,
319 pub log_store_write_rows: LabelGuardedIntCounterVec<3>,
320
321 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>,
323 pub log_store_read_rows: LabelGuardedIntCounterVec<4>,
324 pub log_store_read_bytes: LabelGuardedIntCounterVec<4>,
325 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>,
326
327 pub iceberg_write_qps: LabelGuardedIntCounterVec<3>,
329 pub iceberg_write_latency: LabelGuardedHistogramVec<3>,
330 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec<3>,
331 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec<3>,
332 pub iceberg_partition_num: LabelGuardedIntGaugeVec<3>,
333 pub iceberg_write_bytes: LabelGuardedIntCounterVec<3>,
334}
335
336impl SinkMetrics {
337 pub fn new(registry: &Registry) -> Self {
338 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
339 "sink_commit_duration",
340 "Duration of commit op in sink",
341 &["actor_id", "connector", "sink_id", "sink_name"],
342 registry
343 )
344 .unwrap();
345
346 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
347 "connector_sink_rows_received",
348 "Number of rows received by sink",
349 &["actor_id", "connector_type", "sink_id", "sink_name"],
350 registry
351 )
352 .unwrap();
353
354 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
355 "log_store_first_write_epoch",
356 "The first write epoch of log store",
357 &["actor_id", "sink_id", "sink_name"],
358 registry
359 )
360 .unwrap();
361
362 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
363 "log_store_latest_write_epoch",
364 "The latest write epoch of log store",
365 &["actor_id", "sink_id", "sink_name"],
366 registry
367 )
368 .unwrap();
369
370 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
371 "log_store_write_rows",
372 "The write rate of rows",
373 &["actor_id", "sink_id", "sink_name"],
374 registry
375 )
376 .unwrap();
377
378 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
379 "log_store_latest_read_epoch",
380 "The latest read epoch of log store",
381 &["actor_id", "connector", "sink_id", "sink_name"],
382 registry
383 )
384 .unwrap();
385
386 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
387 "log_store_read_rows",
388 "The read rate of rows",
389 &["actor_id", "connector", "sink_id", "sink_name"],
390 registry
391 )
392 .unwrap();
393
394 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
395 "log_store_read_bytes",
396 "Total size of chunks read by log reader",
397 &["actor_id", "connector", "sink_id", "sink_name"],
398 registry
399 )
400 .unwrap();
401
402 let log_store_reader_wait_new_future_duration_ns =
403 register_guarded_int_counter_vec_with_registry!(
404 "log_store_reader_wait_new_future_duration_ns",
405 "Accumulated duration of LogReader to wait for next call to create future",
406 &["actor_id", "connector", "sink_id", "sink_name"],
407 registry
408 )
409 .unwrap();
410
411 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
412 "iceberg_write_qps",
413 "The qps of iceberg writer",
414 &["actor_id", "sink_id", "sink_name"],
415 registry
416 )
417 .unwrap();
418
419 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
420 "iceberg_write_latency",
421 "The latency of iceberg writer",
422 &["actor_id", "sink_id", "sink_name"],
423 registry
424 )
425 .unwrap();
426
427 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
428 "iceberg_rolling_unflushed_data_file",
429 "The unflushed data file count of iceberg rolling writer",
430 &["actor_id", "sink_id", "sink_name"],
431 registry
432 )
433 .unwrap();
434
435 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
436 "iceberg_position_delete_cache_num",
437 "The delete cache num of iceberg position delete writer",
438 &["actor_id", "sink_id", "sink_name"],
439 registry
440 )
441 .unwrap();
442
443 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
444 "iceberg_partition_num",
445 "The partition num of iceberg partition writer",
446 &["actor_id", "sink_id", "sink_name"],
447 registry
448 )
449 .unwrap();
450
451 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
452 "iceberg_write_bytes",
453 "The write bytes of iceberg writer",
454 &["actor_id", "sink_id", "sink_name"],
455 registry
456 )
457 .unwrap();
458
459 Self {
460 sink_commit_duration,
461 connector_sink_rows_received,
462 log_store_first_write_epoch,
463 log_store_latest_write_epoch,
464 log_store_write_rows,
465 log_store_latest_read_epoch,
466 log_store_read_rows,
467 log_store_read_bytes,
468 log_store_reader_wait_new_future_duration_ns,
469 iceberg_write_qps,
470 iceberg_write_latency,
471 iceberg_rolling_unflushed_data_file,
472 iceberg_position_delete_cache_num,
473 iceberg_partition_num,
474 iceberg_write_bytes,
475 }
476 }
477}
478
479#[derive(Clone)]
480pub struct SinkWriterParam {
481 pub executor_id: u64,
483 pub vnode_bitmap: Option<Bitmap>,
484 pub meta_client: Option<SinkMetaClient>,
485 pub extra_partition_col_idx: Option<usize>,
490
491 pub actor_id: ActorId,
492 pub sink_id: SinkId,
493 pub sink_name: String,
494 pub connector: String,
495 pub streaming_config: StreamingConfig,
496}
497
498#[derive(Clone)]
499pub struct SinkWriterMetrics {
500 pub sink_commit_duration: LabelGuardedHistogram<4>,
501 pub connector_sink_rows_received: LabelGuardedIntCounter<4>,
502}
503
504impl SinkWriterMetrics {
505 pub fn new(writer_param: &SinkWriterParam) -> Self {
506 let labels = [
507 &writer_param.actor_id.to_string(),
508 writer_param.connector.as_str(),
509 &writer_param.sink_id.to_string(),
510 writer_param.sink_name.as_str(),
511 ];
512 let sink_commit_duration = GLOBAL_SINK_METRICS
513 .sink_commit_duration
514 .with_guarded_label_values(&labels);
515 let connector_sink_rows_received = GLOBAL_SINK_METRICS
516 .connector_sink_rows_received
517 .with_guarded_label_values(&labels);
518 Self {
519 sink_commit_duration,
520 connector_sink_rows_received,
521 }
522 }
523
524 #[cfg(test)]
525 pub fn for_test() -> Self {
526 Self {
527 sink_commit_duration: LabelGuardedHistogram::test_histogram(),
528 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(),
529 }
530 }
531}
532
533#[derive(Clone)]
534pub enum SinkMetaClient {
535 MetaClient(MetaClient),
536 MockMetaClient(MockMetaClient),
537}
538
539impl SinkMetaClient {
540 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
541 match self {
542 SinkMetaClient::MetaClient(meta_client) => {
543 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
544 meta_client.sink_coordinate_client().await,
545 )
546 }
547 SinkMetaClient::MockMetaClient(mock_meta_client) => {
548 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
549 mock_meta_client.sink_coordinate_client(),
550 )
551 }
552 }
553 }
554
555 pub async fn add_sink_fail_evet_log(
556 &self,
557 sink_id: u32,
558 sink_name: String,
559 connector: String,
560 error: String,
561 ) {
562 match self {
563 SinkMetaClient::MetaClient(meta_client) => {
564 match meta_client
565 .add_sink_fail_evet(sink_id, sink_name, connector, error)
566 .await
567 {
568 Ok(_) => {}
569 Err(e) => {
570 tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
571 }
572 }
573 }
574 SinkMetaClient::MockMetaClient(_) => {}
575 }
576 }
577}
578
579impl SinkWriterParam {
580 pub fn for_test() -> Self {
581 SinkWriterParam {
582 executor_id: Default::default(),
583 vnode_bitmap: Default::default(),
584 meta_client: Default::default(),
585 extra_partition_col_idx: Default::default(),
586
587 actor_id: 1,
588 sink_id: SinkId::new(1),
589 sink_name: "test_sink".to_owned(),
590 connector: "test_connector".to_owned(),
591 streaming_config: StreamingConfig::default(),
592 }
593 }
594}
595
596fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
597 matches!(
598 sink_name,
599 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK
600 )
601}
602pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
603 const SINK_NAME: &'static str;
604 type LogSinker: LogSinker;
605 type Coordinator: SinkCommitCoordinator;
606
607 fn set_default_commit_checkpoint_interval(
608 desc: &mut SinkDesc,
609 user_specified: &SinkDecouple,
610 ) -> Result<()> {
611 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
612 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
613 Some(commit_checkpoint_interval) => {
614 let commit_checkpoint_interval = commit_checkpoint_interval
615 .parse::<u64>()
616 .map_err(|e| SinkError::Config(anyhow!(e)))?;
617 if matches!(user_specified, SinkDecouple::Disable)
618 && commit_checkpoint_interval > 1
619 {
620 return Err(SinkError::Config(anyhow!(
621 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
622 )));
623 }
624 }
625 None => match user_specified {
626 SinkDecouple::Default | SinkDecouple::Enable => {
627 desc.properties.insert(
628 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
629 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
630 );
631 }
632 SinkDecouple::Disable => {
633 desc.properties.insert(
634 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
635 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
636 );
637 }
638 },
639 }
640 }
641 Ok(())
642 }
643
644 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
646 match user_specified {
647 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
648 SinkDecouple::Disable => Ok(false),
649 }
650 }
651
652 async fn validate(&self) -> Result<()>;
653 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
654
655 fn is_coordinated_sink(&self) -> bool {
656 false
657 }
658
659 #[expect(clippy::unused_async)]
660 async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
661 Err(SinkError::Coordinator(anyhow!("no coordinator")))
662 }
663}
664
665pub trait SinkLogReader: Send {
666 fn start_from(
667 &mut self,
668 start_offset: Option<u64>,
669 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
670 fn next_item(
674 &mut self,
675 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
676
677 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
680}
681
682impl<R: LogReader> SinkLogReader for &mut R {
683 fn next_item(
684 &mut self,
685 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
686 <R as LogReader>::next_item(*self)
687 }
688
689 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
690 <R as LogReader>::truncate(*self, offset)
691 }
692
693 fn start_from(
694 &mut self,
695 start_offset: Option<u64>,
696 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
697 <R as LogReader>::start_from(*self, start_offset)
698 }
699}
700
701#[async_trait]
702pub trait LogSinker: 'static + Send {
703 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
705}
706pub type SinkCommittedEpochSubscriber = Arc<
707 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
708 + Send
709 + Sync
710 + 'static,
711>;
712
713#[async_trait]
714pub trait SinkCommitCoordinator {
715 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
717 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
722}
723
724pub struct DummySinkCommitCoordinator;
725
726#[async_trait]
727impl SinkCommitCoordinator for DummySinkCommitCoordinator {
728 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
729 Ok(None)
730 }
731
732 async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
733 Ok(())
734 }
735}
736
737impl SinkImpl {
738 pub fn new(mut param: SinkParam) -> Result<Self> {
739 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
740
741 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
743
744 let sink_type = param
745 .properties
746 .get(CONNECTOR_TYPE_KEY)
747 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
748
749 let sink_type = sink_type.to_lowercase();
750 match_sink_name_str!(
751 sink_type.as_str(),
752 SinkType,
753 Ok(SinkType::try_from(param)?.into()),
754 |other| {
755 Err(SinkError::Config(anyhow!(
756 "unsupported sink connector {}",
757 other
758 )))
759 }
760 )
761 }
762
763 pub fn is_sink_into_table(&self) -> bool {
764 matches!(self, SinkImpl::Table(_))
765 }
766
767 pub fn is_blackhole(&self) -> bool {
768 matches!(self, SinkImpl::BlackHole(_))
769 }
770
771 pub fn is_coordinated_sink(&self) -> bool {
772 dispatch_sink!(self, sink, sink.is_coordinated_sink())
773 }
774}
775
776pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
777 SinkImpl::new(param)
778}
779
780macro_rules! def_sink_impl {
781 () => {
782 $crate::for_all_sinks! { def_sink_impl }
783 };
784 ({ $({ $variant_name:ident, $sink_type:ty }),* }) => {
785 #[derive(Debug)]
786 pub enum SinkImpl {
787 $(
788 $variant_name(Box<$sink_type>),
789 )*
790 }
791
792 $(
793 impl From<$sink_type> for SinkImpl {
794 fn from(sink: $sink_type) -> SinkImpl {
795 SinkImpl::$variant_name(Box::new(sink))
796 }
797 }
798 )*
799 };
800}
801
802def_sink_impl!();
803
804pub type Result<T> = std::result::Result<T, SinkError>;
805
806#[derive(Error, Debug)]
807pub enum SinkError {
808 #[error("Kafka error: {0}")]
809 Kafka(#[from] rdkafka::error::KafkaError),
810 #[error("Kinesis error: {0}")]
811 Kinesis(
812 #[source]
813 #[backtrace]
814 anyhow::Error,
815 ),
816 #[error("Remote sink error: {0}")]
817 Remote(
818 #[source]
819 #[backtrace]
820 anyhow::Error,
821 ),
822 #[error("Encode error: {0}")]
823 Encode(String),
824 #[error("Avro error: {0}")]
825 Avro(#[from] apache_avro::Error),
826 #[error("Iceberg error: {0}")]
827 Iceberg(
828 #[source]
829 #[backtrace]
830 anyhow::Error,
831 ),
832 #[error("config error: {0}")]
833 Config(
834 #[source]
835 #[backtrace]
836 anyhow::Error,
837 ),
838 #[error("coordinator error: {0}")]
839 Coordinator(
840 #[source]
841 #[backtrace]
842 anyhow::Error,
843 ),
844 #[error("ClickHouse error: {0}")]
845 ClickHouse(String),
846 #[error("Redis error: {0}")]
847 Redis(String),
848 #[error("Mqtt error: {0}")]
849 Mqtt(
850 #[source]
851 #[backtrace]
852 anyhow::Error,
853 ),
854 #[error("Nats error: {0}")]
855 Nats(
856 #[source]
857 #[backtrace]
858 anyhow::Error,
859 ),
860 #[error("Google Pub/Sub error: {0}")]
861 GooglePubSub(
862 #[source]
863 #[backtrace]
864 anyhow::Error,
865 ),
866 #[error("Doris/Starrocks connect error: {0}")]
867 DorisStarrocksConnect(
868 #[source]
869 #[backtrace]
870 anyhow::Error,
871 ),
872 #[error("Doris error: {0}")]
873 Doris(String),
874 #[error("DeltaLake error: {0}")]
875 DeltaLake(
876 #[source]
877 #[backtrace]
878 anyhow::Error,
879 ),
880 #[error("ElasticSearch/OpenSearch error: {0}")]
881 ElasticSearchOpenSearch(
882 #[source]
883 #[backtrace]
884 anyhow::Error,
885 ),
886 #[error("Starrocks error: {0}")]
887 Starrocks(String),
888 #[error("File error: {0}")]
889 File(String),
890 #[error("Pulsar error: {0}")]
891 Pulsar(
892 #[source]
893 #[backtrace]
894 anyhow::Error,
895 ),
896 #[error(transparent)]
897 Internal(
898 #[from]
899 #[backtrace]
900 anyhow::Error,
901 ),
902 #[error("BigQuery error: {0}")]
903 BigQuery(
904 #[source]
905 #[backtrace]
906 anyhow::Error,
907 ),
908 #[error("DynamoDB error: {0}")]
909 DynamoDb(
910 #[source]
911 #[backtrace]
912 anyhow::Error,
913 ),
914 #[error("SQL Server error: {0}")]
915 SqlServer(
916 #[source]
917 #[backtrace]
918 anyhow::Error,
919 ),
920 #[error("Postgres error: {0}")]
921 Postgres(
922 #[source]
923 #[backtrace]
924 anyhow::Error,
925 ),
926 #[error(transparent)]
927 Connector(
928 #[from]
929 #[backtrace]
930 ConnectorError,
931 ),
932 #[error("Secret error: {0}")]
933 Secret(
934 #[from]
935 #[backtrace]
936 SecretError,
937 ),
938 #[error("Mongodb error: {0}")]
939 Mongodb(
940 #[source]
941 #[backtrace]
942 anyhow::Error,
943 ),
944}
945
946impl From<sea_orm::DbErr> for SinkError {
947 fn from(err: sea_orm::DbErr) -> Self {
948 SinkError::Iceberg(anyhow!(err))
949 }
950}
951
952impl From<OpendalError> for SinkError {
953 fn from(error: OpendalError) -> Self {
954 SinkError::File(error.to_report_string())
955 }
956}
957
958impl From<parquet::errors::ParquetError> for SinkError {
959 fn from(error: parquet::errors::ParquetError) -> Self {
960 SinkError::File(error.to_report_string())
961 }
962}
963
964impl From<ArrayError> for SinkError {
965 fn from(error: ArrayError) -> Self {
966 SinkError::File(error.to_report_string())
967 }
968}
969
970impl From<RpcError> for SinkError {
971 fn from(value: RpcError) -> Self {
972 SinkError::Remote(anyhow!(value))
973 }
974}
975
976impl From<ClickHouseError> for SinkError {
977 fn from(value: ClickHouseError) -> Self {
978 SinkError::ClickHouse(value.to_report_string())
979 }
980}
981
982impl From<DeltaTableError> for SinkError {
983 fn from(value: DeltaTableError) -> Self {
984 SinkError::DeltaLake(anyhow!(value))
985 }
986}
987
988impl From<RedisError> for SinkError {
989 fn from(value: RedisError) -> Self {
990 SinkError::Redis(value.to_report_string())
991 }
992}
993
994impl From<tiberius::error::Error> for SinkError {
995 fn from(err: tiberius::error::Error) -> Self {
996 SinkError::SqlServer(anyhow!(err))
997 }
998}
999
1000impl From<::elasticsearch::Error> for SinkError {
1001 fn from(err: ::elasticsearch::Error) -> Self {
1002 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1003 }
1004}
1005
1006impl From<::opensearch::Error> for SinkError {
1007 fn from(err: ::opensearch::Error) -> Self {
1008 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1009 }
1010}
1011
1012impl From<tokio_postgres::Error> for SinkError {
1013 fn from(err: tokio_postgres::Error) -> Self {
1014 SinkError::Postgres(anyhow!(err))
1015 }
1016}