1feature_gated_sink_mod!(big_query, "bigquery");
16pub mod boxed;
17pub mod catalog;
18feature_gated_sink_mod!(clickhouse, ClickHouse, "clickhouse");
19pub mod coordinate;
20pub mod decouple_checkpoint_log_sink;
21feature_gated_sink_mod!(deltalake, DeltaLake, "deltalake");
22feature_gated_sink_mod!(doris, "doris");
23#[cfg(any(feature = "sink-doris", feature = "sink-starrocks"))]
24pub mod doris_starrocks_connector;
25pub mod dynamodb;
26pub mod elasticsearch_opensearch;
27pub mod encoder;
28pub mod file_sink;
29pub mod formatter;
30feature_gated_sink_mod!(google_pubsub, GooglePubSub, "google_pubsub");
31pub mod iceberg;
32pub mod kafka;
33pub mod kinesis;
34use risingwave_common::bail;
35pub mod jdbc_jni_client;
36pub mod log_store;
37pub mod mock_coordination_client;
38pub mod mongodb;
39pub mod mqtt;
40pub mod nats;
41pub mod postgres;
42pub mod pulsar;
43pub mod redis;
44pub mod remote;
45pub mod snowflake_redshift;
46pub mod sqlserver;
47feature_gated_sink_mod!(starrocks, "starrocks");
48pub mod test_sink;
49pub mod trivial;
50pub mod utils;
51pub mod writer;
52pub mod prelude {
53 pub use crate::sink::{
54 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
55 SINK_USER_FORCE_COMPACTION, Sink, SinkError, SinkParam, SinkWriterParam,
56 };
57}
58
59use std::collections::BTreeMap;
60use std::future::Future;
61use std::sync::{Arc, LazyLock};
62
63use ::redis::RedisError;
64use anyhow::anyhow;
65use async_trait::async_trait;
66use decouple_checkpoint_log_sink::{
67 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
68 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
69};
70use futures::future::BoxFuture;
71use opendal::Error as OpendalError;
72use prometheus::Registry;
73use risingwave_common::array::ArrayError;
74use risingwave_common::bitmap::Bitmap;
75use risingwave_common::catalog::{ColumnDesc, Field, Schema};
76use risingwave_common::config::StreamingConfig;
77use risingwave_common::hash::ActorId;
78use risingwave_common::metrics::{
79 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
80 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
81};
82use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
83use risingwave_common::secret::{LocalSecretManager, SecretError};
84use risingwave_common::session_config::sink_decouple::SinkDecouple;
85use risingwave_common::{
86 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
87 register_guarded_int_gauge_vec_with_registry,
88};
89use risingwave_pb::catalog::PbSinkType;
90use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
91use risingwave_pb::id::ExecutorId;
92use risingwave_rpc_client::MetaClient;
93use risingwave_rpc_client::error::RpcError;
94use starrocks::STARROCKS_SINK;
95use thiserror::Error;
96use thiserror_ext::AsReport;
97use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
98pub use tracing;
99
100use self::catalog::{SinkFormatDesc, SinkType};
101use self::clickhouse::CLICKHOUSE_SINK;
102use self::deltalake::DELTALAKE_SINK;
103use self::iceberg::ICEBERG_SINK;
104use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
105use crate::WithPropertiesExt;
106use crate::connector_common::IcebergSinkCompactionUpdate;
107use crate::error::{ConnectorError, ConnectorResult};
108use crate::sink::boxed::{BoxSinglePhaseCoordinator, BoxTwoPhaseCoordinator};
109use crate::sink::catalog::desc::SinkDesc;
110use crate::sink::catalog::{SinkCatalog, SinkId};
111use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
112use crate::sink::file_sink::fs::FsSink;
113use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
114use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
115use crate::sink::utils::feature_gated_sink_mod;
116
117const BOUNDED_CHANNEL_SIZE: usize = 16;
118#[macro_export]
119macro_rules! for_all_sinks {
120 ($macro:path $(, $arg:tt)*) => {
121 $macro! {
122 {
123 { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
124 { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
125 { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
126 { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
127 { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
128 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
129 { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
130 { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
131 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
132 { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
133 { Jdbc, $crate::sink::remote::JdbcSink, () },
134 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
135 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
136 { Cassandra, $crate::sink::remote::CassandraSink, () },
137 { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
138 { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
139 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
140
141 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
142 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
143 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
144
145 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
146 { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
147 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
148 { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
149 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
150 { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
151 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
152 { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
153 { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
154 { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
155
156 { Test, $crate::sink::test_sink::TestSink, () },
157 { Table, $crate::sink::trivial::TableSink, () }
158 }
159 $(,$arg)*
160 }
161 };
162}
163
164#[macro_export]
165macro_rules! generate_config_use_clauses {
166 ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
167 $(
168 $crate::generate_config_use_single! { $($config_type)+ }
169 )*
170 };
171}
172
173#[macro_export]
174macro_rules! generate_config_use_single {
175 (()) => {};
177
178 ($config_type:path) => {
180 #[allow(unused_imports)]
181 pub(super) use $config_type;
182 };
183}
184
185#[macro_export]
187macro_rules! use_all_sink_configs {
188 () => {
189 $crate::for_all_sinks! { $crate::generate_config_use_clauses }
190 };
191}
192
193#[macro_export]
194macro_rules! dispatch_sink {
195 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
196 use $crate::sink::SinkImpl;
197
198 match $impl {
199 $(
200 SinkImpl::$variant_name($sink) => $body,
201 )*
202 }
203 }};
204 ($impl:expr, $sink:ident, $body:expr) => {{
205 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
206 }};
207}
208
209#[macro_export]
210macro_rules! match_sink_name_str {
211 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
212 use $crate::sink::Sink;
213 match $name_str {
214 $(
215 <$sink_type>::SINK_NAME => {
216 type $type_name = $sink_type;
217 {
218 $body
219 }
220 },
221 )*
222 other => ($on_other_closure)(other),
223 }
224 }};
225 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
226 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
227 }};
228}
229
230pub const CONNECTOR_TYPE_KEY: &str = "connector";
231pub const SINK_TYPE_OPTION: &str = "type";
232pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
234pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
235pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
236pub const SINK_TYPE_UPSERT: &str = "upsert";
237pub const SINK_TYPE_RETRACT: &str = "retract";
238pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
239pub const SINK_USER_FORCE_COMPACTION: &str = "force_compaction";
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct SinkParam {
243 pub sink_id: SinkId,
244 pub sink_name: String,
245 pub properties: BTreeMap<String, String>,
246 pub columns: Vec<ColumnDesc>,
247 pub downstream_pk: Option<Vec<usize>>,
249 pub sink_type: SinkType,
250 pub format_desc: Option<SinkFormatDesc>,
251 pub db_name: String,
252
253 pub sink_from_name: String,
259}
260
261impl SinkParam {
262 pub fn from_proto(pb_param: PbSinkParam) -> Self {
263 let table_schema = pb_param.table_schema.expect("should contain table schema");
264 let format_desc = match pb_param.format_desc {
265 Some(f) => f.try_into().ok(),
266 None => {
267 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
268 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
269 match (connector, r#type) {
270 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
271 _ => None,
272 }
273 }
274 };
275 Self {
276 sink_id: SinkId::from(pb_param.sink_id),
277 sink_name: pb_param.sink_name,
278 properties: pb_param.properties,
279 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
280 downstream_pk: if table_schema.pk_indices.is_empty() {
281 None
282 } else {
283 Some(
284 (table_schema.pk_indices.iter())
285 .map(|i| *i as usize)
286 .collect(),
287 )
288 },
289 sink_type: SinkType::from_proto(
290 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
291 ),
292 format_desc,
293 db_name: pb_param.db_name,
294 sink_from_name: pb_param.sink_from_name,
295 }
296 }
297
298 pub fn to_proto(&self) -> PbSinkParam {
299 PbSinkParam {
300 sink_id: self.sink_id,
301 sink_name: self.sink_name.clone(),
302 properties: self.properties.clone(),
303 table_schema: Some(TableSchema {
304 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
305 pk_indices: (self.downstream_pk.as_ref())
306 .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
307 }),
308 sink_type: self.sink_type.to_proto().into(),
309 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
310 db_name: self.db_name.clone(),
311 sink_from_name: self.sink_from_name.clone(),
312 }
313 }
314
315 pub fn schema(&self) -> Schema {
316 Schema {
317 fields: self.columns.iter().map(Field::from).collect(),
318 }
319 }
320
321 pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
327 self.downstream_pk.clone().unwrap_or_default()
328 }
329
330 pub fn fill_secret_for_format_desc(
333 format_desc: Option<SinkFormatDesc>,
334 ) -> Result<Option<SinkFormatDesc>> {
335 match format_desc {
336 Some(mut format_desc) => {
337 format_desc.options = LocalSecretManager::global()
338 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
339 Ok(Some(format_desc))
340 }
341 None => Ok(None),
342 }
343 }
344
345 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
347 let columns = sink_catalog
348 .visible_columns()
349 .map(|col| col.column_desc.clone())
350 .collect();
351 let properties_with_secret = LocalSecretManager::global()
352 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
353 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
354 Ok(Self {
355 sink_id: sink_catalog.id,
356 sink_name: sink_catalog.name,
357 properties: properties_with_secret,
358 columns,
359 downstream_pk: sink_catalog.downstream_pk,
360 sink_type: sink_catalog.sink_type,
361 format_desc: format_desc_with_secret,
362 db_name: sink_catalog.db_name,
363 sink_from_name: sink_catalog.sink_from_name,
364 })
365 }
366}
367
368pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
369 use crate::enforce_secret::EnforceSecret;
370
371 let connector = props
372 .get_connector()
373 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
374 let key_iter = props.key_iter();
375 match_sink_name_str!(
376 connector.as_str(),
377 PropType,
378 PropType::enforce_secret(key_iter),
379 |other| bail!("connector '{}' is not supported", other)
380 )
381}
382
383pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
384 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
385
386#[derive(Clone)]
387pub struct SinkMetrics {
388 pub sink_commit_duration: LabelGuardedHistogramVec,
389 pub connector_sink_rows_received: LabelGuardedIntCounterVec,
390
391 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
393 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
394 pub log_store_write_rows: LabelGuardedIntCounterVec,
395
396 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
398 pub log_store_read_rows: LabelGuardedIntCounterVec,
399 pub log_store_read_bytes: LabelGuardedIntCounterVec,
400 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
401
402 pub iceberg_write_qps: LabelGuardedIntCounterVec,
404 pub iceberg_write_latency: LabelGuardedHistogramVec,
405 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
406 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
407 pub iceberg_partition_num: LabelGuardedIntGaugeVec,
408 pub iceberg_write_bytes: LabelGuardedIntCounterVec,
409 pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
410}
411
412impl SinkMetrics {
413 pub fn new(registry: &Registry) -> Self {
414 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
415 "sink_commit_duration",
416 "Duration of commit op in sink",
417 &["actor_id", "connector", "sink_id", "sink_name"],
418 registry
419 )
420 .unwrap();
421
422 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
423 "connector_sink_rows_received",
424 "Number of rows received by sink",
425 &["actor_id", "connector_type", "sink_id", "sink_name"],
426 registry
427 )
428 .unwrap();
429
430 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
431 "log_store_first_write_epoch",
432 "The first write epoch of log store",
433 &["actor_id", "sink_id", "sink_name"],
434 registry
435 )
436 .unwrap();
437
438 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
439 "log_store_latest_write_epoch",
440 "The latest write epoch of log store",
441 &["actor_id", "sink_id", "sink_name"],
442 registry
443 )
444 .unwrap();
445
446 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
447 "log_store_write_rows",
448 "The write rate of rows",
449 &["actor_id", "sink_id", "sink_name"],
450 registry
451 )
452 .unwrap();
453
454 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
455 "log_store_latest_read_epoch",
456 "The latest read epoch of log store",
457 &["actor_id", "connector", "sink_id", "sink_name"],
458 registry
459 )
460 .unwrap();
461
462 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
463 "log_store_read_rows",
464 "The read rate of rows",
465 &["actor_id", "connector", "sink_id", "sink_name"],
466 registry
467 )
468 .unwrap();
469
470 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
471 "log_store_read_bytes",
472 "Total size of chunks read by log reader",
473 &["actor_id", "connector", "sink_id", "sink_name"],
474 registry
475 )
476 .unwrap();
477
478 let log_store_reader_wait_new_future_duration_ns =
479 register_guarded_int_counter_vec_with_registry!(
480 "log_store_reader_wait_new_future_duration_ns",
481 "Accumulated duration of LogReader to wait for next call to create future",
482 &["actor_id", "connector", "sink_id", "sink_name"],
483 registry
484 )
485 .unwrap();
486
487 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
488 "iceberg_write_qps",
489 "The qps of iceberg writer",
490 &["actor_id", "sink_id", "sink_name"],
491 registry
492 )
493 .unwrap();
494
495 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
496 "iceberg_write_latency",
497 "The latency of iceberg writer",
498 &["actor_id", "sink_id", "sink_name"],
499 registry
500 )
501 .unwrap();
502
503 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
504 "iceberg_rolling_unflushed_data_file",
505 "The unflushed data file count of iceberg rolling writer",
506 &["actor_id", "sink_id", "sink_name"],
507 registry
508 )
509 .unwrap();
510
511 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
512 "iceberg_position_delete_cache_num",
513 "The delete cache num of iceberg position delete writer",
514 &["actor_id", "sink_id", "sink_name"],
515 registry
516 )
517 .unwrap();
518
519 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
520 "iceberg_partition_num",
521 "The partition num of iceberg partition writer",
522 &["actor_id", "sink_id", "sink_name"],
523 registry
524 )
525 .unwrap();
526
527 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
528 "iceberg_write_bytes",
529 "The write bytes of iceberg writer",
530 &["actor_id", "sink_id", "sink_name"],
531 registry
532 )
533 .unwrap();
534
535 let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
536 "iceberg_snapshot_num",
537 "The snapshot number of iceberg table",
538 &["sink_name", "catalog_name", "table_name"],
539 registry
540 )
541 .unwrap();
542
543 Self {
544 sink_commit_duration,
545 connector_sink_rows_received,
546 log_store_first_write_epoch,
547 log_store_latest_write_epoch,
548 log_store_write_rows,
549 log_store_latest_read_epoch,
550 log_store_read_rows,
551 log_store_read_bytes,
552 log_store_reader_wait_new_future_duration_ns,
553 iceberg_write_qps,
554 iceberg_write_latency,
555 iceberg_rolling_unflushed_data_file,
556 iceberg_position_delete_cache_num,
557 iceberg_partition_num,
558 iceberg_write_bytes,
559 iceberg_snapshot_num,
560 }
561 }
562}
563
564#[derive(Clone)]
565pub struct SinkWriterParam {
566 pub executor_id: ExecutorId,
568 pub vnode_bitmap: Option<Bitmap>,
569 pub meta_client: Option<SinkMetaClient>,
570 pub extra_partition_col_idx: Option<usize>,
575
576 pub actor_id: ActorId,
577 pub sink_id: SinkId,
578 pub sink_name: String,
579 pub connector: String,
580 pub streaming_config: StreamingConfig,
581}
582
583#[derive(Clone)]
584pub struct SinkWriterMetrics {
585 pub sink_commit_duration: LabelGuardedHistogram,
586 pub connector_sink_rows_received: LabelGuardedIntCounter,
587}
588
589impl SinkWriterMetrics {
590 pub fn new(writer_param: &SinkWriterParam) -> Self {
591 let labels = [
592 &writer_param.actor_id.to_string(),
593 writer_param.connector.as_str(),
594 &writer_param.sink_id.to_string(),
595 writer_param.sink_name.as_str(),
596 ];
597 let sink_commit_duration = GLOBAL_SINK_METRICS
598 .sink_commit_duration
599 .with_guarded_label_values(&labels);
600 let connector_sink_rows_received = GLOBAL_SINK_METRICS
601 .connector_sink_rows_received
602 .with_guarded_label_values(&labels);
603 Self {
604 sink_commit_duration,
605 connector_sink_rows_received,
606 }
607 }
608
609 #[cfg(test)]
610 pub fn for_test() -> Self {
611 Self {
612 sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
613 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
614 }
615 }
616}
617
618#[derive(Clone)]
619pub enum SinkMetaClient {
620 MetaClient(MetaClient),
621 MockMetaClient(MockMetaClient),
622}
623
624impl SinkMetaClient {
625 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
626 match self {
627 SinkMetaClient::MetaClient(meta_client) => {
628 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
629 meta_client.sink_coordinate_client().await,
630 )
631 }
632 SinkMetaClient::MockMetaClient(mock_meta_client) => {
633 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
634 mock_meta_client.sink_coordinate_client(),
635 )
636 }
637 }
638 }
639
640 pub async fn add_sink_fail_evet_log(
641 &self,
642 sink_id: SinkId,
643 sink_name: String,
644 connector: String,
645 error: String,
646 ) {
647 match self {
648 SinkMetaClient::MetaClient(meta_client) => {
649 match meta_client
650 .add_sink_fail_evet(sink_id, sink_name, connector, error)
651 .await
652 {
653 Ok(_) => {}
654 Err(e) => {
655 tracing::warn!(error = %e.as_report(), %sink_id, "Failed to add sink fail event to event log.");
656 }
657 }
658 }
659 SinkMetaClient::MockMetaClient(_) => {}
660 }
661 }
662}
663
664impl SinkWriterParam {
665 pub fn for_test() -> Self {
666 SinkWriterParam {
667 executor_id: Default::default(),
668 vnode_bitmap: Default::default(),
669 meta_client: Default::default(),
670 extra_partition_col_idx: Default::default(),
671
672 actor_id: 1.into(),
673 sink_id: SinkId::new(1),
674 sink_name: "test_sink".to_owned(),
675 connector: "test_connector".to_owned(),
676 streaming_config: StreamingConfig::default(),
677 }
678 }
679}
680
681fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
682 matches!(
683 sink_name,
684 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
685 )
686}
687pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
688 const SINK_NAME: &'static str;
689
690 type LogSinker: LogSinker;
691
692 fn set_default_commit_checkpoint_interval(
693 desc: &mut SinkDesc,
694 user_specified: &SinkDecouple,
695 ) -> Result<()> {
696 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
697 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
698 Some(commit_checkpoint_interval) => {
699 let commit_checkpoint_interval = commit_checkpoint_interval
700 .parse::<u64>()
701 .map_err(|e| SinkError::Config(anyhow!(e)))?;
702 if matches!(user_specified, SinkDecouple::Disable)
703 && commit_checkpoint_interval > 1
704 {
705 return Err(SinkError::Config(anyhow!(
706 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
707 )));
708 }
709 }
710 None => match user_specified {
711 SinkDecouple::Default | SinkDecouple::Enable => {
712 if matches!(Self::SINK_NAME, ICEBERG_SINK) {
713 desc.properties.insert(
714 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
715 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
716 );
717 } else {
718 desc.properties.insert(
719 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
720 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
721 );
722 }
723 }
724 SinkDecouple::Disable => {
725 desc.properties.insert(
726 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
727 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
728 );
729 }
730 },
731 }
732 }
733 Ok(())
734 }
735
736 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
738 match user_specified {
739 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
740 SinkDecouple::Disable => Ok(false),
741 }
742 }
743
744 fn support_schema_change() -> bool {
745 false
746 }
747
748 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
749 Ok(())
750 }
751
752 async fn validate(&self) -> Result<()>;
753 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
754
755 fn is_coordinated_sink(&self) -> bool {
756 false
757 }
758
759 async fn new_coordinator(
760 &self,
761 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
762 ) -> Result<SinkCommitCoordinator> {
763 Err(SinkError::Coordinator(anyhow!("no coordinator")))
764 }
765}
766
767pub trait SinkLogReader: Send {
768 fn start_from(
769 &mut self,
770 start_offset: Option<u64>,
771 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
772 fn next_item(
776 &mut self,
777 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
778
779 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
782}
783
784impl<R: LogReader> SinkLogReader for &mut R {
785 fn next_item(
786 &mut self,
787 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
788 <R as LogReader>::next_item(*self)
789 }
790
791 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
792 <R as LogReader>::truncate(*self, offset)
793 }
794
795 fn start_from(
796 &mut self,
797 start_offset: Option<u64>,
798 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
799 <R as LogReader>::start_from(*self, start_offset)
800 }
801}
802
803#[async_trait]
804pub trait LogSinker: 'static + Send {
805 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
807}
808pub type SinkCommittedEpochSubscriber = Arc<
809 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
810 + Send
811 + Sync
812 + 'static,
813>;
814
815pub enum SinkCommitCoordinator {
816 SinglePhase(BoxSinglePhaseCoordinator),
817 TwoPhase(BoxTwoPhaseCoordinator),
818}
819
820#[async_trait]
821pub trait SinglePhaseCommitCoordinator {
822 async fn init(&mut self) -> Result<()>;
824
825 async fn commit(
827 &mut self,
828 epoch: u64,
829 metadata: Vec<SinkMetadata>,
830 add_columns: Option<Vec<Field>>,
831 ) -> Result<()>;
832}
833
834#[async_trait]
835pub trait TwoPhaseCommitCoordinator {
836 async fn init(&mut self) -> Result<()>;
838
839 async fn pre_commit(
841 &mut self,
842 epoch: u64,
843 metadata: Vec<SinkMetadata>,
844 add_columns: Option<Vec<Field>>,
845 ) -> Result<Vec<u8>>;
846
847 async fn commit(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()>;
849
850 async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>);
852}
853
854impl SinkImpl {
855 pub fn new(mut param: SinkParam) -> Result<Self> {
856 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
857
858 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
860
861 let sink_type = param
862 .properties
863 .get(CONNECTOR_TYPE_KEY)
864 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
865
866 let sink_type = sink_type.to_lowercase();
867 match_sink_name_str!(
868 sink_type.as_str(),
869 SinkType,
870 Ok(SinkType::try_from(param)?.into()),
871 |other| {
872 Err(SinkError::Config(anyhow!(
873 "unsupported sink connector {}",
874 other
875 )))
876 }
877 )
878 }
879
880 pub fn is_sink_into_table(&self) -> bool {
881 matches!(self, SinkImpl::Table(_))
882 }
883
884 pub fn is_blackhole(&self) -> bool {
885 matches!(self, SinkImpl::BlackHole(_))
886 }
887
888 pub fn is_coordinated_sink(&self) -> bool {
889 dispatch_sink!(self, sink, sink.is_coordinated_sink())
890 }
891}
892
893pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
894 SinkImpl::new(param)
895}
896
897macro_rules! def_sink_impl {
898 () => {
899 $crate::for_all_sinks! { def_sink_impl }
900 };
901 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
902 #[derive(Debug)]
903 pub enum SinkImpl {
904 $(
905 $variant_name(Box<$sink_type>),
906 )*
907 }
908
909 $(
910 impl From<$sink_type> for SinkImpl {
911 fn from(sink: $sink_type) -> SinkImpl {
912 SinkImpl::$variant_name(Box::new(sink))
913 }
914 }
915 )*
916 };
917}
918
919def_sink_impl!();
920
921pub type Result<T> = std::result::Result<T, SinkError>;
922
923#[derive(Error, Debug)]
924pub enum SinkError {
925 #[error("Kafka error: {0}")]
926 Kafka(#[from] rdkafka::error::KafkaError),
927 #[error("Kinesis error: {0}")]
928 Kinesis(
929 #[source]
930 #[backtrace]
931 anyhow::Error,
932 ),
933 #[error("Remote sink error: {0}")]
934 Remote(
935 #[source]
936 #[backtrace]
937 anyhow::Error,
938 ),
939 #[error("Encode error: {0}")]
940 Encode(String),
941 #[error("Avro error: {0}")]
942 Avro(#[from] apache_avro::Error),
943 #[error("Iceberg error: {0}")]
944 Iceberg(
945 #[source]
946 #[backtrace]
947 anyhow::Error,
948 ),
949 #[error("config error: {0}")]
950 Config(
951 #[source]
952 #[backtrace]
953 anyhow::Error,
954 ),
955 #[error("coordinator error: {0}")]
956 Coordinator(
957 #[source]
958 #[backtrace]
959 anyhow::Error,
960 ),
961 #[error("ClickHouse error: {0}")]
962 ClickHouse(String),
963 #[error("Redis error: {0}")]
964 Redis(String),
965 #[error("Mqtt error: {0}")]
966 Mqtt(
967 #[source]
968 #[backtrace]
969 anyhow::Error,
970 ),
971 #[error("Nats error: {0}")]
972 Nats(
973 #[source]
974 #[backtrace]
975 anyhow::Error,
976 ),
977 #[error("Google Pub/Sub error: {0}")]
978 GooglePubSub(
979 #[source]
980 #[backtrace]
981 anyhow::Error,
982 ),
983 #[error("Doris/Starrocks connect error: {0}")]
984 DorisStarrocksConnect(
985 #[source]
986 #[backtrace]
987 anyhow::Error,
988 ),
989 #[error("Doris error: {0}")]
990 Doris(String),
991 #[error("DeltaLake error: {0}")]
992 DeltaLake(
993 #[source]
994 #[backtrace]
995 anyhow::Error,
996 ),
997 #[error("ElasticSearch/OpenSearch error: {0}")]
998 ElasticSearchOpenSearch(
999 #[source]
1000 #[backtrace]
1001 anyhow::Error,
1002 ),
1003 #[error("Starrocks error: {0}")]
1004 Starrocks(String),
1005 #[error("File error: {0}")]
1006 File(String),
1007 #[error("Pulsar error: {0}")]
1008 Pulsar(
1009 #[source]
1010 #[backtrace]
1011 anyhow::Error,
1012 ),
1013 #[error(transparent)]
1014 Internal(
1015 #[from]
1016 #[backtrace]
1017 anyhow::Error,
1018 ),
1019 #[error("BigQuery error: {0}")]
1020 BigQuery(
1021 #[source]
1022 #[backtrace]
1023 anyhow::Error,
1024 ),
1025 #[error("DynamoDB error: {0}")]
1026 DynamoDb(
1027 #[source]
1028 #[backtrace]
1029 anyhow::Error,
1030 ),
1031 #[error("SQL Server error: {0}")]
1032 SqlServer(
1033 #[source]
1034 #[backtrace]
1035 anyhow::Error,
1036 ),
1037 #[error("Postgres error: {0}")]
1038 Postgres(
1039 #[source]
1040 #[backtrace]
1041 anyhow::Error,
1042 ),
1043 #[error(transparent)]
1044 Connector(
1045 #[from]
1046 #[backtrace]
1047 ConnectorError,
1048 ),
1049 #[error("Secret error: {0}")]
1050 Secret(
1051 #[from]
1052 #[backtrace]
1053 SecretError,
1054 ),
1055 #[error("Mongodb error: {0}")]
1056 Mongodb(
1057 #[source]
1058 #[backtrace]
1059 anyhow::Error,
1060 ),
1061}
1062
1063impl From<sea_orm::DbErr> for SinkError {
1064 fn from(err: sea_orm::DbErr) -> Self {
1065 SinkError::Iceberg(anyhow!(err))
1066 }
1067}
1068
1069impl From<OpendalError> for SinkError {
1070 fn from(error: OpendalError) -> Self {
1071 SinkError::File(error.to_report_string())
1072 }
1073}
1074
1075impl From<parquet::errors::ParquetError> for SinkError {
1076 fn from(error: parquet::errors::ParquetError) -> Self {
1077 SinkError::File(error.to_report_string())
1078 }
1079}
1080
1081impl From<ArrayError> for SinkError {
1082 fn from(error: ArrayError) -> Self {
1083 SinkError::File(error.to_report_string())
1084 }
1085}
1086
1087impl From<RpcError> for SinkError {
1088 fn from(value: RpcError) -> Self {
1089 SinkError::Remote(anyhow!(value))
1090 }
1091}
1092
1093impl From<RedisError> for SinkError {
1094 fn from(value: RedisError) -> Self {
1095 SinkError::Redis(value.to_report_string())
1096 }
1097}
1098
1099impl From<tiberius::error::Error> for SinkError {
1100 fn from(err: tiberius::error::Error) -> Self {
1101 SinkError::SqlServer(anyhow!(err))
1102 }
1103}
1104
1105impl From<::elasticsearch::Error> for SinkError {
1106 fn from(err: ::elasticsearch::Error) -> Self {
1107 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1108 }
1109}
1110
1111impl From<::opensearch::Error> for SinkError {
1112 fn from(err: ::opensearch::Error) -> Self {
1113 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1114 }
1115}
1116
1117impl From<tokio_postgres::Error> for SinkError {
1118 fn from(err: tokio_postgres::Error) -> Self {
1119 SinkError::Postgres(anyhow!(err))
1120 }
1121}