1pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32use risingwave_common::bail;
33pub mod jdbc_jni_client;
34pub mod log_store;
35pub mod mock_coordination_client;
36pub mod mongodb;
37pub mod mqtt;
38pub mod nats;
39pub mod postgres;
40pub mod pulsar;
41pub mod redis;
42pub mod remote;
43pub mod snowflake_redshift;
44pub mod sqlserver;
45pub mod starrocks;
46pub mod test_sink;
47pub mod trivial;
48pub mod utils;
49pub mod writer;
50pub mod prelude {
51 pub use crate::sink::{
52 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkError,
53 SinkParam, SinkWriterParam,
54 };
55}
56
57use std::collections::BTreeMap;
58use std::future::Future;
59use std::sync::{Arc, LazyLock};
60
61use ::clickhouse::error::Error as ClickHouseError;
62use ::redis::RedisError;
63use anyhow::anyhow;
64use async_trait::async_trait;
65use clickhouse::CLICKHOUSE_SINK;
66use decouple_checkpoint_log_sink::{
67 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
68 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
69};
70use deltalake::DELTALAKE_SINK;
71use futures::future::BoxFuture;
72use iceberg::ICEBERG_SINK;
73use opendal::Error as OpendalError;
74use prometheus::Registry;
75use risingwave_common::array::ArrayError;
76use risingwave_common::bitmap::Bitmap;
77use risingwave_common::catalog::{ColumnDesc, Field, Schema};
78use risingwave_common::config::StreamingConfig;
79use risingwave_common::hash::ActorId;
80use risingwave_common::metrics::{
81 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
82 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
83};
84use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
85use risingwave_common::secret::{LocalSecretManager, SecretError};
86use risingwave_common::session_config::sink_decouple::SinkDecouple;
87use risingwave_common::{
88 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
89 register_guarded_int_gauge_vec_with_registry,
90};
91use risingwave_pb::catalog::PbSinkType;
92use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
93use risingwave_rpc_client::MetaClient;
94use risingwave_rpc_client::error::RpcError;
95use sea_orm::DatabaseConnection;
96use starrocks::STARROCKS_SINK;
97use thiserror::Error;
98use thiserror_ext::AsReport;
99use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
100pub use tracing;
101
102use self::catalog::{SinkFormatDesc, SinkType};
103use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
104use crate::WithPropertiesExt;
105use crate::connector_common::IcebergSinkCompactionUpdate;
106use crate::error::{ConnectorError, ConnectorResult};
107use crate::sink::catalog::desc::SinkDesc;
108use crate::sink::catalog::{SinkCatalog, SinkId};
109use crate::sink::file_sink::fs::FsSink;
110use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
111use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
112use crate::sink::writer::SinkWriter;
113
114const BOUNDED_CHANNEL_SIZE: usize = 16;
115#[macro_export]
116macro_rules! for_all_sinks {
117 ($macro:path $(, $arg:tt)*) => {
118 $macro! {
119 {
120 { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
121 { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
122 { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
123 { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
124 { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
125 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
126 { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
127 { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
128 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
129 { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
130 { Jdbc, $crate::sink::remote::JdbcSink, () },
131 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
132 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
133 { Cassandra, $crate::sink::remote::CassandraSink, () },
134 { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
135 { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
136 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
137
138 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
139 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
140 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
141
142 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
143 { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
144 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
145 { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
146 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
147 { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
148 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
149 { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
150 { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
151 { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
152
153 { Test, $crate::sink::test_sink::TestSink, () },
154 { Table, $crate::sink::trivial::TableSink, () }
155 }
156 $(,$arg)*
157 }
158 };
159}
160
161#[macro_export]
162macro_rules! generate_config_use_clauses {
163 ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
164 $(
165 $crate::generate_config_use_single! { $($config_type)+ }
166 )*
167 };
168}
169
170#[macro_export]
171macro_rules! generate_config_use_single {
172 (()) => {};
174
175 ($config_type:path) => {
177 #[allow(unused_imports)]
178 pub(super) use $config_type;
179 };
180}
181
182#[macro_export]
184macro_rules! use_all_sink_configs {
185 () => {
186 $crate::for_all_sinks! { $crate::generate_config_use_clauses }
187 };
188}
189
190#[macro_export]
191macro_rules! dispatch_sink {
192 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
193 use $crate::sink::SinkImpl;
194
195 match $impl {
196 $(
197 SinkImpl::$variant_name($sink) => $body,
198 )*
199 }
200 }};
201 ($impl:expr, $sink:ident, $body:expr) => {{
202 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
203 }};
204}
205
206#[macro_export]
207macro_rules! match_sink_name_str {
208 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
209 use $crate::sink::Sink;
210 match $name_str {
211 $(
212 <$sink_type>::SINK_NAME => {
213 type $type_name = $sink_type;
214 {
215 $body
216 }
217 },
218 )*
219 other => ($on_other_closure)(other),
220 }
221 }};
222 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
223 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
224 }};
225}
226
227pub const CONNECTOR_TYPE_KEY: &str = "connector";
228pub const SINK_TYPE_OPTION: &str = "type";
229pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
231pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
232pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
233pub const SINK_TYPE_UPSERT: &str = "upsert";
234pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
235
236#[derive(Debug, Clone, PartialEq, Eq)]
237pub struct SinkParam {
238 pub sink_id: SinkId,
239 pub sink_name: String,
240 pub properties: BTreeMap<String, String>,
241 pub columns: Vec<ColumnDesc>,
242 pub downstream_pk: Vec<usize>,
243 pub sink_type: SinkType,
244 pub format_desc: Option<SinkFormatDesc>,
245 pub db_name: String,
246
247 pub sink_from_name: String,
253}
254
255impl SinkParam {
256 pub fn from_proto(pb_param: PbSinkParam) -> Self {
257 let table_schema = pb_param.table_schema.expect("should contain table schema");
258 let format_desc = match pb_param.format_desc {
259 Some(f) => f.try_into().ok(),
260 None => {
261 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
262 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
263 match (connector, r#type) {
264 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
265 _ => None,
266 }
267 }
268 };
269 Self {
270 sink_id: SinkId::from(pb_param.sink_id),
271 sink_name: pb_param.sink_name,
272 properties: pb_param.properties,
273 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
274 downstream_pk: table_schema
275 .pk_indices
276 .iter()
277 .map(|i| *i as usize)
278 .collect(),
279 sink_type: SinkType::from_proto(
280 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
281 ),
282 format_desc,
283 db_name: pb_param.db_name,
284 sink_from_name: pb_param.sink_from_name,
285 }
286 }
287
288 pub fn to_proto(&self) -> PbSinkParam {
289 PbSinkParam {
290 sink_id: self.sink_id.sink_id,
291 sink_name: self.sink_name.clone(),
292 properties: self.properties.clone(),
293 table_schema: Some(TableSchema {
294 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
295 pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
296 }),
297 sink_type: self.sink_type.to_proto().into(),
298 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
299 db_name: self.db_name.clone(),
300 sink_from_name: self.sink_from_name.clone(),
301 }
302 }
303
304 pub fn schema(&self) -> Schema {
305 Schema {
306 fields: self.columns.iter().map(Field::from).collect(),
307 }
308 }
309
310 pub fn fill_secret_for_format_desc(
313 format_desc: Option<SinkFormatDesc>,
314 ) -> Result<Option<SinkFormatDesc>> {
315 match format_desc {
316 Some(mut format_desc) => {
317 format_desc.options = LocalSecretManager::global()
318 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
319 Ok(Some(format_desc))
320 }
321 None => Ok(None),
322 }
323 }
324
325 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
327 let columns = sink_catalog
328 .visible_columns()
329 .map(|col| col.column_desc.clone())
330 .collect();
331 let properties_with_secret = LocalSecretManager::global()
332 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
333 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
334 Ok(Self {
335 sink_id: sink_catalog.id,
336 sink_name: sink_catalog.name,
337 properties: properties_with_secret,
338 columns,
339 downstream_pk: sink_catalog.downstream_pk,
340 sink_type: sink_catalog.sink_type,
341 format_desc: format_desc_with_secret,
342 db_name: sink_catalog.db_name,
343 sink_from_name: sink_catalog.sink_from_name,
344 })
345 }
346}
347
348pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
349 use crate::enforce_secret::EnforceSecret;
350
351 let connector = props
352 .get_connector()
353 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
354 let key_iter = props.key_iter();
355 match_sink_name_str!(
356 connector.as_str(),
357 PropType,
358 PropType::enforce_secret(key_iter),
359 |other| bail!("connector '{}' is not supported", other)
360 )
361}
362
363pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
364 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
365
366#[derive(Clone)]
367pub struct SinkMetrics {
368 pub sink_commit_duration: LabelGuardedHistogramVec,
369 pub connector_sink_rows_received: LabelGuardedIntCounterVec,
370
371 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
373 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
374 pub log_store_write_rows: LabelGuardedIntCounterVec,
375
376 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
378 pub log_store_read_rows: LabelGuardedIntCounterVec,
379 pub log_store_read_bytes: LabelGuardedIntCounterVec,
380 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
381
382 pub iceberg_write_qps: LabelGuardedIntCounterVec,
384 pub iceberg_write_latency: LabelGuardedHistogramVec,
385 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
386 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
387 pub iceberg_partition_num: LabelGuardedIntGaugeVec,
388 pub iceberg_write_bytes: LabelGuardedIntCounterVec,
389 pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
390}
391
392impl SinkMetrics {
393 pub fn new(registry: &Registry) -> Self {
394 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
395 "sink_commit_duration",
396 "Duration of commit op in sink",
397 &["actor_id", "connector", "sink_id", "sink_name"],
398 registry
399 )
400 .unwrap();
401
402 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
403 "connector_sink_rows_received",
404 "Number of rows received by sink",
405 &["actor_id", "connector_type", "sink_id", "sink_name"],
406 registry
407 )
408 .unwrap();
409
410 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
411 "log_store_first_write_epoch",
412 "The first write epoch of log store",
413 &["actor_id", "sink_id", "sink_name"],
414 registry
415 )
416 .unwrap();
417
418 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
419 "log_store_latest_write_epoch",
420 "The latest write epoch of log store",
421 &["actor_id", "sink_id", "sink_name"],
422 registry
423 )
424 .unwrap();
425
426 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
427 "log_store_write_rows",
428 "The write rate of rows",
429 &["actor_id", "sink_id", "sink_name"],
430 registry
431 )
432 .unwrap();
433
434 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
435 "log_store_latest_read_epoch",
436 "The latest read epoch of log store",
437 &["actor_id", "connector", "sink_id", "sink_name"],
438 registry
439 )
440 .unwrap();
441
442 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
443 "log_store_read_rows",
444 "The read rate of rows",
445 &["actor_id", "connector", "sink_id", "sink_name"],
446 registry
447 )
448 .unwrap();
449
450 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
451 "log_store_read_bytes",
452 "Total size of chunks read by log reader",
453 &["actor_id", "connector", "sink_id", "sink_name"],
454 registry
455 )
456 .unwrap();
457
458 let log_store_reader_wait_new_future_duration_ns =
459 register_guarded_int_counter_vec_with_registry!(
460 "log_store_reader_wait_new_future_duration_ns",
461 "Accumulated duration of LogReader to wait for next call to create future",
462 &["actor_id", "connector", "sink_id", "sink_name"],
463 registry
464 )
465 .unwrap();
466
467 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
468 "iceberg_write_qps",
469 "The qps of iceberg writer",
470 &["actor_id", "sink_id", "sink_name"],
471 registry
472 )
473 .unwrap();
474
475 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
476 "iceberg_write_latency",
477 "The latency of iceberg writer",
478 &["actor_id", "sink_id", "sink_name"],
479 registry
480 )
481 .unwrap();
482
483 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
484 "iceberg_rolling_unflushed_data_file",
485 "The unflushed data file count of iceberg rolling writer",
486 &["actor_id", "sink_id", "sink_name"],
487 registry
488 )
489 .unwrap();
490
491 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
492 "iceberg_position_delete_cache_num",
493 "The delete cache num of iceberg position delete writer",
494 &["actor_id", "sink_id", "sink_name"],
495 registry
496 )
497 .unwrap();
498
499 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
500 "iceberg_partition_num",
501 "The partition num of iceberg partition writer",
502 &["actor_id", "sink_id", "sink_name"],
503 registry
504 )
505 .unwrap();
506
507 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
508 "iceberg_write_bytes",
509 "The write bytes of iceberg writer",
510 &["actor_id", "sink_id", "sink_name"],
511 registry
512 )
513 .unwrap();
514
515 let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
516 "iceberg_snapshot_num",
517 "The snapshot number of iceberg table",
518 &["sink_name", "catalog_name", "table_name"],
519 registry
520 )
521 .unwrap();
522
523 Self {
524 sink_commit_duration,
525 connector_sink_rows_received,
526 log_store_first_write_epoch,
527 log_store_latest_write_epoch,
528 log_store_write_rows,
529 log_store_latest_read_epoch,
530 log_store_read_rows,
531 log_store_read_bytes,
532 log_store_reader_wait_new_future_duration_ns,
533 iceberg_write_qps,
534 iceberg_write_latency,
535 iceberg_rolling_unflushed_data_file,
536 iceberg_position_delete_cache_num,
537 iceberg_partition_num,
538 iceberg_write_bytes,
539 iceberg_snapshot_num,
540 }
541 }
542}
543
544#[derive(Clone)]
545pub struct SinkWriterParam {
546 pub executor_id: u64,
548 pub vnode_bitmap: Option<Bitmap>,
549 pub meta_client: Option<SinkMetaClient>,
550 pub extra_partition_col_idx: Option<usize>,
555
556 pub actor_id: ActorId,
557 pub sink_id: SinkId,
558 pub sink_name: String,
559 pub connector: String,
560 pub streaming_config: StreamingConfig,
561}
562
563#[derive(Clone)]
564pub struct SinkWriterMetrics {
565 pub sink_commit_duration: LabelGuardedHistogram,
566 pub connector_sink_rows_received: LabelGuardedIntCounter,
567}
568
569impl SinkWriterMetrics {
570 pub fn new(writer_param: &SinkWriterParam) -> Self {
571 let labels = [
572 &writer_param.actor_id.to_string(),
573 writer_param.connector.as_str(),
574 &writer_param.sink_id.to_string(),
575 writer_param.sink_name.as_str(),
576 ];
577 let sink_commit_duration = GLOBAL_SINK_METRICS
578 .sink_commit_duration
579 .with_guarded_label_values(&labels);
580 let connector_sink_rows_received = GLOBAL_SINK_METRICS
581 .connector_sink_rows_received
582 .with_guarded_label_values(&labels);
583 Self {
584 sink_commit_duration,
585 connector_sink_rows_received,
586 }
587 }
588
589 #[cfg(test)]
590 pub fn for_test() -> Self {
591 Self {
592 sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
593 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
594 }
595 }
596}
597
598#[derive(Clone)]
599pub enum SinkMetaClient {
600 MetaClient(MetaClient),
601 MockMetaClient(MockMetaClient),
602}
603
604impl SinkMetaClient {
605 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
606 match self {
607 SinkMetaClient::MetaClient(meta_client) => {
608 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
609 meta_client.sink_coordinate_client().await,
610 )
611 }
612 SinkMetaClient::MockMetaClient(mock_meta_client) => {
613 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
614 mock_meta_client.sink_coordinate_client(),
615 )
616 }
617 }
618 }
619
620 pub async fn add_sink_fail_evet_log(
621 &self,
622 sink_id: u32,
623 sink_name: String,
624 connector: String,
625 error: String,
626 ) {
627 match self {
628 SinkMetaClient::MetaClient(meta_client) => {
629 match meta_client
630 .add_sink_fail_evet(sink_id, sink_name, connector, error)
631 .await
632 {
633 Ok(_) => {}
634 Err(e) => {
635 tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
636 }
637 }
638 }
639 SinkMetaClient::MockMetaClient(_) => {}
640 }
641 }
642}
643
644impl SinkWriterParam {
645 pub fn for_test() -> Self {
646 SinkWriterParam {
647 executor_id: Default::default(),
648 vnode_bitmap: Default::default(),
649 meta_client: Default::default(),
650 extra_partition_col_idx: Default::default(),
651
652 actor_id: 1,
653 sink_id: SinkId::new(1),
654 sink_name: "test_sink".to_owned(),
655 connector: "test_connector".to_owned(),
656 streaming_config: StreamingConfig::default(),
657 }
658 }
659}
660
661fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
662 matches!(
663 sink_name,
664 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
665 )
666}
667pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
668 const SINK_NAME: &'static str;
669
670 type LogSinker: LogSinker;
671 #[expect(deprecated)]
672 type Coordinator: SinkCommitCoordinator = NoSinkCommitCoordinator;
673
674 fn set_default_commit_checkpoint_interval(
675 desc: &mut SinkDesc,
676 user_specified: &SinkDecouple,
677 ) -> Result<()> {
678 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
679 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
680 Some(commit_checkpoint_interval) => {
681 let commit_checkpoint_interval = commit_checkpoint_interval
682 .parse::<u64>()
683 .map_err(|e| SinkError::Config(anyhow!(e)))?;
684 if matches!(user_specified, SinkDecouple::Disable)
685 && commit_checkpoint_interval > 1
686 {
687 return Err(SinkError::Config(anyhow!(
688 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
689 )));
690 }
691 }
692 None => match user_specified {
693 SinkDecouple::Default | SinkDecouple::Enable => {
694 desc.properties.insert(
695 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
696 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
697 );
698 }
699 SinkDecouple::Disable => {
700 desc.properties.insert(
701 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
702 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
703 );
704 }
705 },
706 }
707 }
708 Ok(())
709 }
710
711 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
713 match user_specified {
714 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
715 SinkDecouple::Disable => Ok(false),
716 }
717 }
718
719 fn support_schema_change() -> bool {
720 false
721 }
722
723 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
724 Ok(())
725 }
726
727 async fn validate(&self) -> Result<()>;
728 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
729
730 fn is_coordinated_sink(&self) -> bool {
731 false
732 }
733
734 async fn new_coordinator(
735 &self,
736 _db: DatabaseConnection,
737 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
738 ) -> Result<Self::Coordinator> {
739 Err(SinkError::Coordinator(anyhow!("no coordinator")))
740 }
741}
742
743pub trait SinkLogReader: Send {
744 fn start_from(
745 &mut self,
746 start_offset: Option<u64>,
747 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
748 fn next_item(
752 &mut self,
753 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
754
755 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
758}
759
760impl<R: LogReader> SinkLogReader for &mut R {
761 fn next_item(
762 &mut self,
763 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
764 <R as LogReader>::next_item(*self)
765 }
766
767 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
768 <R as LogReader>::truncate(*self, offset)
769 }
770
771 fn start_from(
772 &mut self,
773 start_offset: Option<u64>,
774 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
775 <R as LogReader>::start_from(*self, start_offset)
776 }
777}
778
779#[async_trait]
780pub trait LogSinker: 'static + Send {
781 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
783}
784pub type SinkCommittedEpochSubscriber = Arc<
785 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
786 + Send
787 + Sync
788 + 'static,
789>;
790
791#[async_trait]
792pub trait SinkCommitCoordinator {
793 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
795 async fn commit(
800 &mut self,
801 epoch: u64,
802 metadata: Vec<SinkMetadata>,
803 add_columns: Option<Vec<Field>>,
804 ) -> Result<()>;
805}
806
807#[deprecated]
808pub struct NoSinkCommitCoordinator(!);
822
823#[expect(deprecated)]
824#[async_trait]
825impl SinkCommitCoordinator for NoSinkCommitCoordinator {
826 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
827 unreachable!()
828 }
829
830 async fn commit(
831 &mut self,
832 _epoch: u64,
833 _metadata: Vec<SinkMetadata>,
834 _add_columns: Option<Vec<Field>>,
835 ) -> Result<()> {
836 unreachable!()
837 }
838}
839
840impl SinkImpl {
841 pub fn new(mut param: SinkParam) -> Result<Self> {
842 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
843
844 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
846
847 let sink_type = param
848 .properties
849 .get(CONNECTOR_TYPE_KEY)
850 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
851
852 let sink_type = sink_type.to_lowercase();
853 match_sink_name_str!(
854 sink_type.as_str(),
855 SinkType,
856 Ok(SinkType::try_from(param)?.into()),
857 |other| {
858 Err(SinkError::Config(anyhow!(
859 "unsupported sink connector {}",
860 other
861 )))
862 }
863 )
864 }
865
866 pub fn is_sink_into_table(&self) -> bool {
867 matches!(self, SinkImpl::Table(_))
868 }
869
870 pub fn is_blackhole(&self) -> bool {
871 matches!(self, SinkImpl::BlackHole(_))
872 }
873
874 pub fn is_coordinated_sink(&self) -> bool {
875 dispatch_sink!(self, sink, sink.is_coordinated_sink())
876 }
877}
878
879pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
880 SinkImpl::new(param)
881}
882
883macro_rules! def_sink_impl {
884 () => {
885 $crate::for_all_sinks! { def_sink_impl }
886 };
887 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
888 #[derive(Debug)]
889 pub enum SinkImpl {
890 $(
891 $variant_name(Box<$sink_type>),
892 )*
893 }
894
895 $(
896 impl From<$sink_type> for SinkImpl {
897 fn from(sink: $sink_type) -> SinkImpl {
898 SinkImpl::$variant_name(Box::new(sink))
899 }
900 }
901 )*
902 };
903}
904
905def_sink_impl!();
906
907pub type Result<T> = std::result::Result<T, SinkError>;
908
909#[derive(Error, Debug)]
910pub enum SinkError {
911 #[error("Kafka error: {0}")]
912 Kafka(#[from] rdkafka::error::KafkaError),
913 #[error("Kinesis error: {0}")]
914 Kinesis(
915 #[source]
916 #[backtrace]
917 anyhow::Error,
918 ),
919 #[error("Remote sink error: {0}")]
920 Remote(
921 #[source]
922 #[backtrace]
923 anyhow::Error,
924 ),
925 #[error("Encode error: {0}")]
926 Encode(String),
927 #[error("Avro error: {0}")]
928 Avro(#[from] apache_avro::Error),
929 #[error("Iceberg error: {0}")]
930 Iceberg(
931 #[source]
932 #[backtrace]
933 anyhow::Error,
934 ),
935 #[error("config error: {0}")]
936 Config(
937 #[source]
938 #[backtrace]
939 anyhow::Error,
940 ),
941 #[error("coordinator error: {0}")]
942 Coordinator(
943 #[source]
944 #[backtrace]
945 anyhow::Error,
946 ),
947 #[error("ClickHouse error: {0}")]
948 ClickHouse(String),
949 #[error("Redis error: {0}")]
950 Redis(String),
951 #[error("Mqtt error: {0}")]
952 Mqtt(
953 #[source]
954 #[backtrace]
955 anyhow::Error,
956 ),
957 #[error("Nats error: {0}")]
958 Nats(
959 #[source]
960 #[backtrace]
961 anyhow::Error,
962 ),
963 #[error("Google Pub/Sub error: {0}")]
964 GooglePubSub(
965 #[source]
966 #[backtrace]
967 anyhow::Error,
968 ),
969 #[error("Doris/Starrocks connect error: {0}")]
970 DorisStarrocksConnect(
971 #[source]
972 #[backtrace]
973 anyhow::Error,
974 ),
975 #[error("Doris error: {0}")]
976 Doris(String),
977 #[error("DeltaLake error: {0}")]
978 DeltaLake(
979 #[source]
980 #[backtrace]
981 anyhow::Error,
982 ),
983 #[error("ElasticSearch/OpenSearch error: {0}")]
984 ElasticSearchOpenSearch(
985 #[source]
986 #[backtrace]
987 anyhow::Error,
988 ),
989 #[error("Starrocks error: {0}")]
990 Starrocks(String),
991 #[error("File error: {0}")]
992 File(String),
993 #[error("Pulsar error: {0}")]
994 Pulsar(
995 #[source]
996 #[backtrace]
997 anyhow::Error,
998 ),
999 #[error(transparent)]
1000 Internal(
1001 #[from]
1002 #[backtrace]
1003 anyhow::Error,
1004 ),
1005 #[error("BigQuery error: {0}")]
1006 BigQuery(
1007 #[source]
1008 #[backtrace]
1009 anyhow::Error,
1010 ),
1011 #[error("DynamoDB error: {0}")]
1012 DynamoDb(
1013 #[source]
1014 #[backtrace]
1015 anyhow::Error,
1016 ),
1017 #[error("SQL Server error: {0}")]
1018 SqlServer(
1019 #[source]
1020 #[backtrace]
1021 anyhow::Error,
1022 ),
1023 #[error("Postgres error: {0}")]
1024 Postgres(
1025 #[source]
1026 #[backtrace]
1027 anyhow::Error,
1028 ),
1029 #[error(transparent)]
1030 Connector(
1031 #[from]
1032 #[backtrace]
1033 ConnectorError,
1034 ),
1035 #[error("Secret error: {0}")]
1036 Secret(
1037 #[from]
1038 #[backtrace]
1039 SecretError,
1040 ),
1041 #[error("Mongodb error: {0}")]
1042 Mongodb(
1043 #[source]
1044 #[backtrace]
1045 anyhow::Error,
1046 ),
1047}
1048
1049impl From<sea_orm::DbErr> for SinkError {
1050 fn from(err: sea_orm::DbErr) -> Self {
1051 SinkError::Iceberg(anyhow!(err))
1052 }
1053}
1054
1055impl From<OpendalError> for SinkError {
1056 fn from(error: OpendalError) -> Self {
1057 SinkError::File(error.to_report_string())
1058 }
1059}
1060
1061impl From<parquet::errors::ParquetError> for SinkError {
1062 fn from(error: parquet::errors::ParquetError) -> Self {
1063 SinkError::File(error.to_report_string())
1064 }
1065}
1066
1067impl From<ArrayError> for SinkError {
1068 fn from(error: ArrayError) -> Self {
1069 SinkError::File(error.to_report_string())
1070 }
1071}
1072
1073impl From<RpcError> for SinkError {
1074 fn from(value: RpcError) -> Self {
1075 SinkError::Remote(anyhow!(value))
1076 }
1077}
1078
1079impl From<ClickHouseError> for SinkError {
1080 fn from(value: ClickHouseError) -> Self {
1081 SinkError::ClickHouse(value.to_report_string())
1082 }
1083}
1084
1085#[cfg(feature = "sink-deltalake")]
1086impl From<::deltalake::DeltaTableError> for SinkError {
1087 fn from(value: ::deltalake::DeltaTableError) -> Self {
1088 SinkError::DeltaLake(anyhow!(value))
1089 }
1090}
1091
1092impl From<RedisError> for SinkError {
1093 fn from(value: RedisError) -> Self {
1094 SinkError::Redis(value.to_report_string())
1095 }
1096}
1097
1098impl From<tiberius::error::Error> for SinkError {
1099 fn from(err: tiberius::error::Error) -> Self {
1100 SinkError::SqlServer(anyhow!(err))
1101 }
1102}
1103
1104impl From<::elasticsearch::Error> for SinkError {
1105 fn from(err: ::elasticsearch::Error) -> Self {
1106 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1107 }
1108}
1109
1110impl From<::opensearch::Error> for SinkError {
1111 fn from(err: ::opensearch::Error) -> Self {
1112 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1113 }
1114}
1115
1116impl From<tokio_postgres::Error> for SinkError {
1117 fn from(err: tokio_postgres::Error) -> Self {
1118 SinkError::Postgres(anyhow!(err))
1119 }
1120}