1feature_gated_sink_mod!(big_query, "bigquery");
16pub mod boxed;
17pub mod catalog;
18feature_gated_sink_mod!(clickhouse, ClickHouse, "clickhouse");
19pub mod coordinate;
20pub mod decouple_checkpoint_log_sink;
21feature_gated_sink_mod!(deltalake, DeltaLake, "deltalake");
22feature_gated_sink_mod!(doris, "doris");
23#[cfg(any(feature = "sink-doris", feature = "sink-starrocks"))]
24pub mod doris_starrocks_connector;
25pub mod dynamodb;
26pub mod elasticsearch_opensearch;
27pub mod encoder;
28pub mod file_sink;
29pub mod formatter;
30feature_gated_sink_mod!(google_pubsub, GooglePubSub, "google_pubsub");
31pub mod iceberg;
32pub mod kafka;
33pub mod kinesis;
34use risingwave_common::bail;
35use risingwave_pb::stream_plan::PbSinkSchemaChange;
36pub mod jdbc_jni_client;
37pub mod log_store;
38pub mod mock_coordination_client;
39pub mod mongodb;
40pub mod mqtt;
41pub mod nats;
42pub mod postgres;
43pub mod pulsar;
44pub mod redis;
45pub mod remote;
46pub mod snowflake_redshift;
47pub mod sqlserver;
48feature_gated_sink_mod!(starrocks, "starrocks");
49pub mod test_sink;
50pub mod trivial;
51pub mod utils;
52pub mod writer;
53pub mod prelude {
54 pub use crate::sink::{
55 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
56 SINK_USER_FORCE_COMPACTION, Sink, SinkError, SinkParam, SinkWriterParam,
57 };
58}
59
60use std::collections::BTreeMap;
61use std::future::Future;
62use std::sync::{Arc, LazyLock};
63
64use ::redis::RedisError;
65use anyhow::anyhow;
66use async_trait::async_trait;
67use decouple_checkpoint_log_sink::{
68 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
69 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
70};
71use futures::future::BoxFuture;
72use opendal::Error as OpendalError;
73use prometheus::Registry;
74use risingwave_common::array::ArrayError;
75use risingwave_common::bitmap::Bitmap;
76use risingwave_common::catalog::{ColumnDesc, Field, Schema};
77use risingwave_common::config::StreamingConfig;
78use risingwave_common::hash::ActorId;
79use risingwave_common::metrics::{
80 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
81 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
82};
83use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
84use risingwave_common::secret::{LocalSecretManager, SecretError};
85use risingwave_common::session_config::sink_decouple::SinkDecouple;
86use risingwave_common::{
87 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
88 register_guarded_int_gauge_vec_with_registry,
89};
90use risingwave_pb::catalog::PbSinkType;
91use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
92use risingwave_pb::id::ExecutorId;
93use risingwave_rpc_client::MetaClient;
94use risingwave_rpc_client::error::RpcError;
95use starrocks::STARROCKS_SINK;
96use thiserror::Error;
97use thiserror_ext::AsReport;
98use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
99pub use tracing;
100
101use self::catalog::{SinkFormatDesc, SinkType};
102use self::clickhouse::CLICKHOUSE_SINK;
103use self::deltalake::DELTALAKE_SINK;
104use self::iceberg::ICEBERG_SINK;
105use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
106use crate::WithPropertiesExt;
107use crate::connector_common::IcebergSinkCompactionUpdate;
108use crate::error::{ConnectorError, ConnectorResult};
109use crate::sink::boxed::{BoxSinglePhaseCoordinator, BoxTwoPhaseCoordinator};
110use crate::sink::catalog::desc::SinkDesc;
111use crate::sink::catalog::{SinkCatalog, SinkId};
112use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
113use crate::sink::file_sink::fs::FsSink;
114use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
115use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
116use crate::sink::utils::feature_gated_sink_mod;
117
118const BOUNDED_CHANNEL_SIZE: usize = 16;
119#[macro_export]
120macro_rules! for_all_sinks {
121 ($macro:path $(, $arg:tt)*) => {
122 $macro! {
123 {
124 { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
125 { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
126 { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
127 { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
128 { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
129 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
130 { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
131 { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
132 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
133 { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
134 { Jdbc, $crate::sink::remote::JdbcSink, () },
135 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
136 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
137 { Cassandra, $crate::sink::remote::CassandraSink, () },
138 { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
139 { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
140 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
141
142 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
143 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
144 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
145
146 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
147 { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
148 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
149 { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
150 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
151 { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
152 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
153 { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
154 { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
155 { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
156
157 { Test, $crate::sink::test_sink::TestSink, () },
158 { Table, $crate::sink::trivial::TableSink, () }
159 }
160 $(,$arg)*
161 }
162 };
163}
164
165#[macro_export]
166macro_rules! generate_config_use_clauses {
167 ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
168 $(
169 $crate::generate_config_use_single! { $($config_type)+ }
170 )*
171 };
172}
173
174#[macro_export]
175macro_rules! generate_config_use_single {
176 (()) => {};
178
179 ($config_type:path) => {
181 #[allow(unused_imports)]
182 pub(super) use $config_type;
183 };
184}
185
186#[macro_export]
188macro_rules! use_all_sink_configs {
189 () => {
190 $crate::for_all_sinks! { $crate::generate_config_use_clauses }
191 };
192}
193
194#[macro_export]
195macro_rules! dispatch_sink {
196 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
197 use $crate::sink::SinkImpl;
198
199 match $impl {
200 $(
201 SinkImpl::$variant_name($sink) => $body,
202 )*
203 }
204 }};
205 ($impl:expr, $sink:ident, $body:expr) => {{
206 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
207 }};
208}
209
210#[macro_export]
211macro_rules! match_sink_name_str {
212 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
213 use $crate::sink::Sink;
214 match $name_str {
215 $(
216 <$sink_type>::SINK_NAME => {
217 type $type_name = $sink_type;
218 {
219 $body
220 }
221 },
222 )*
223 other => ($on_other_closure)(other),
224 }
225 }};
226 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
227 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
228 }};
229}
230
231pub const CONNECTOR_TYPE_KEY: &str = "connector";
232pub const SINK_TYPE_OPTION: &str = "type";
233pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
235pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
236pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
237pub const SINK_TYPE_UPSERT: &str = "upsert";
238pub const SINK_TYPE_RETRACT: &str = "retract";
239pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
240pub const SINK_USER_FORCE_COMPACTION: &str = "force_compaction";
241
242#[derive(Debug, Clone, PartialEq, Eq)]
243pub struct SinkParam {
244 pub sink_id: SinkId,
245 pub sink_name: String,
246 pub properties: BTreeMap<String, String>,
247 pub columns: Vec<ColumnDesc>,
248 pub downstream_pk: Option<Vec<usize>>,
250 pub sink_type: SinkType,
251 pub ignore_delete: bool,
253 pub format_desc: Option<SinkFormatDesc>,
254 pub db_name: String,
255
256 pub sink_from_name: String,
262}
263
264impl SinkParam {
265 pub fn from_proto(pb_param: PbSinkParam) -> Self {
266 let ignore_delete = pb_param.ignore_delete();
267 let table_schema = pb_param.table_schema.expect("should contain table schema");
268 let format_desc = match pb_param.format_desc {
269 Some(f) => f.try_into().ok(),
270 None => {
271 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
272 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
273 match (connector, r#type) {
274 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
275 _ => None,
276 }
277 }
278 };
279 Self {
280 sink_id: SinkId::from(pb_param.sink_id),
281 sink_name: pb_param.sink_name,
282 properties: pb_param.properties,
283 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
284 downstream_pk: if table_schema.pk_indices.is_empty() {
285 None
286 } else {
287 Some(
288 (table_schema.pk_indices.iter())
289 .map(|i| *i as usize)
290 .collect(),
291 )
292 },
293 sink_type: SinkType::from_proto(
294 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
295 ),
296 ignore_delete,
297 format_desc,
298 db_name: pb_param.db_name,
299 sink_from_name: pb_param.sink_from_name,
300 }
301 }
302
303 pub fn to_proto(&self) -> PbSinkParam {
304 PbSinkParam {
305 sink_id: self.sink_id,
306 sink_name: self.sink_name.clone(),
307 properties: self.properties.clone(),
308 table_schema: Some(TableSchema {
309 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
310 pk_indices: (self.downstream_pk.as_ref())
311 .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
312 }),
313 sink_type: self.sink_type.to_proto().into(),
314 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
315 db_name: self.db_name.clone(),
316 sink_from_name: self.sink_from_name.clone(),
317 raw_ignore_delete: self.ignore_delete,
318 }
319 }
320
321 pub fn schema(&self) -> Schema {
322 Schema {
323 fields: self.columns.iter().map(Field::from).collect(),
324 }
325 }
326
327 pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
333 self.downstream_pk.clone().unwrap_or_default()
334 }
335
336 pub fn fill_secret_for_format_desc(
339 format_desc: Option<SinkFormatDesc>,
340 ) -> Result<Option<SinkFormatDesc>> {
341 match format_desc {
342 Some(mut format_desc) => {
343 format_desc.options = LocalSecretManager::global()
344 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
345 Ok(Some(format_desc))
346 }
347 None => Ok(None),
348 }
349 }
350
351 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
353 let columns = sink_catalog
354 .visible_columns()
355 .map(|col| col.column_desc.clone())
356 .collect();
357 let properties_with_secret = LocalSecretManager::global()
358 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
359 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
360 Ok(Self {
361 sink_id: sink_catalog.id,
362 sink_name: sink_catalog.name,
363 properties: properties_with_secret,
364 columns,
365 downstream_pk: sink_catalog.downstream_pk,
366 sink_type: sink_catalog.sink_type,
367 ignore_delete: sink_catalog.ignore_delete,
368 format_desc: format_desc_with_secret,
369 db_name: sink_catalog.db_name,
370 sink_from_name: sink_catalog.sink_from_name,
371 })
372 }
373}
374
375pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
376 use crate::enforce_secret::EnforceSecret;
377
378 let connector = props
379 .get_connector()
380 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
381 let key_iter = props.key_iter();
382 match_sink_name_str!(
383 connector.as_str(),
384 PropType,
385 PropType::enforce_secret(key_iter),
386 |other| bail!("connector '{}' is not supported", other)
387 )
388}
389
390pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
391 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
392
393#[derive(Clone)]
394pub struct SinkMetrics {
395 pub sink_commit_duration: LabelGuardedHistogramVec,
396 pub connector_sink_rows_received: LabelGuardedIntCounterVec,
397
398 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
400 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
401 pub log_store_write_rows: LabelGuardedIntCounterVec,
402
403 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
405 pub log_store_read_rows: LabelGuardedIntCounterVec,
406 pub log_store_read_bytes: LabelGuardedIntCounterVec,
407 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
408
409 pub iceberg_write_qps: LabelGuardedIntCounterVec,
411 pub iceberg_write_latency: LabelGuardedHistogramVec,
412 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
413 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
414 pub iceberg_partition_num: LabelGuardedIntGaugeVec,
415 pub iceberg_write_bytes: LabelGuardedIntCounterVec,
416 pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
417}
418
419impl SinkMetrics {
420 pub fn new(registry: &Registry) -> Self {
421 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
422 "sink_commit_duration",
423 "Duration of commit op in sink",
424 &["actor_id", "connector", "sink_id", "sink_name"],
425 registry
426 )
427 .unwrap();
428
429 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
430 "connector_sink_rows_received",
431 "Number of rows received by sink",
432 &["actor_id", "connector_type", "sink_id", "sink_name"],
433 registry
434 )
435 .unwrap();
436
437 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
438 "log_store_first_write_epoch",
439 "The first write epoch of log store",
440 &["actor_id", "sink_id", "sink_name"],
441 registry
442 )
443 .unwrap();
444
445 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
446 "log_store_latest_write_epoch",
447 "The latest write epoch of log store",
448 &["actor_id", "sink_id", "sink_name"],
449 registry
450 )
451 .unwrap();
452
453 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
454 "log_store_write_rows",
455 "The write rate of rows",
456 &["actor_id", "sink_id", "sink_name"],
457 registry
458 )
459 .unwrap();
460
461 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
462 "log_store_latest_read_epoch",
463 "The latest read epoch of log store",
464 &["actor_id", "connector", "sink_id", "sink_name"],
465 registry
466 )
467 .unwrap();
468
469 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
470 "log_store_read_rows",
471 "The read rate of rows",
472 &["actor_id", "connector", "sink_id", "sink_name"],
473 registry
474 )
475 .unwrap();
476
477 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
478 "log_store_read_bytes",
479 "Total size of chunks read by log reader",
480 &["actor_id", "connector", "sink_id", "sink_name"],
481 registry
482 )
483 .unwrap();
484
485 let log_store_reader_wait_new_future_duration_ns =
486 register_guarded_int_counter_vec_with_registry!(
487 "log_store_reader_wait_new_future_duration_ns",
488 "Accumulated duration of LogReader to wait for next call to create future",
489 &["actor_id", "connector", "sink_id", "sink_name"],
490 registry
491 )
492 .unwrap();
493
494 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
495 "iceberg_write_qps",
496 "The qps of iceberg writer",
497 &["actor_id", "sink_id", "sink_name"],
498 registry
499 )
500 .unwrap();
501
502 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
503 "iceberg_write_latency",
504 "The latency of iceberg writer",
505 &["actor_id", "sink_id", "sink_name"],
506 registry
507 )
508 .unwrap();
509
510 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
511 "iceberg_rolling_unflushed_data_file",
512 "The unflushed data file count of iceberg rolling writer",
513 &["actor_id", "sink_id", "sink_name"],
514 registry
515 )
516 .unwrap();
517
518 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
519 "iceberg_position_delete_cache_num",
520 "The delete cache num of iceberg position delete writer",
521 &["actor_id", "sink_id", "sink_name"],
522 registry
523 )
524 .unwrap();
525
526 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
527 "iceberg_partition_num",
528 "The partition num of iceberg partition writer",
529 &["actor_id", "sink_id", "sink_name"],
530 registry
531 )
532 .unwrap();
533
534 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
535 "iceberg_write_bytes",
536 "The write bytes of iceberg writer",
537 &["actor_id", "sink_id", "sink_name"],
538 registry
539 )
540 .unwrap();
541
542 let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
543 "iceberg_snapshot_num",
544 "The snapshot number of iceberg table",
545 &["sink_name", "catalog_name", "table_name"],
546 registry
547 )
548 .unwrap();
549
550 Self {
551 sink_commit_duration,
552 connector_sink_rows_received,
553 log_store_first_write_epoch,
554 log_store_latest_write_epoch,
555 log_store_write_rows,
556 log_store_latest_read_epoch,
557 log_store_read_rows,
558 log_store_read_bytes,
559 log_store_reader_wait_new_future_duration_ns,
560 iceberg_write_qps,
561 iceberg_write_latency,
562 iceberg_rolling_unflushed_data_file,
563 iceberg_position_delete_cache_num,
564 iceberg_partition_num,
565 iceberg_write_bytes,
566 iceberg_snapshot_num,
567 }
568 }
569}
570
571#[derive(Clone)]
572pub struct SinkWriterParam {
573 pub executor_id: ExecutorId,
575 pub vnode_bitmap: Option<Bitmap>,
576 pub meta_client: Option<SinkMetaClient>,
577 pub extra_partition_col_idx: Option<usize>,
582
583 pub actor_id: ActorId,
584 pub sink_id: SinkId,
585 pub sink_name: String,
586 pub connector: String,
587 pub streaming_config: StreamingConfig,
588}
589
590#[derive(Clone)]
591pub struct SinkWriterMetrics {
592 pub sink_commit_duration: LabelGuardedHistogram,
593 pub connector_sink_rows_received: LabelGuardedIntCounter,
594}
595
596impl SinkWriterMetrics {
597 pub fn new(writer_param: &SinkWriterParam) -> Self {
598 let labels = [
599 &writer_param.actor_id.to_string(),
600 writer_param.connector.as_str(),
601 &writer_param.sink_id.to_string(),
602 writer_param.sink_name.as_str(),
603 ];
604 let sink_commit_duration = GLOBAL_SINK_METRICS
605 .sink_commit_duration
606 .with_guarded_label_values(&labels);
607 let connector_sink_rows_received = GLOBAL_SINK_METRICS
608 .connector_sink_rows_received
609 .with_guarded_label_values(&labels);
610 Self {
611 sink_commit_duration,
612 connector_sink_rows_received,
613 }
614 }
615
616 #[cfg(test)]
617 pub fn for_test() -> Self {
618 Self {
619 sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
620 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
621 }
622 }
623}
624
625#[derive(Clone)]
626pub enum SinkMetaClient {
627 MetaClient(MetaClient),
628 MockMetaClient(MockMetaClient),
629}
630
631impl SinkMetaClient {
632 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
633 match self {
634 SinkMetaClient::MetaClient(meta_client) => {
635 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
636 meta_client.sink_coordinate_client().await,
637 )
638 }
639 SinkMetaClient::MockMetaClient(mock_meta_client) => {
640 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
641 mock_meta_client.sink_coordinate_client(),
642 )
643 }
644 }
645 }
646
647 pub async fn add_sink_fail_evet_log(
648 &self,
649 sink_id: SinkId,
650 sink_name: String,
651 connector: String,
652 error: String,
653 ) {
654 match self {
655 SinkMetaClient::MetaClient(meta_client) => {
656 match meta_client
657 .add_sink_fail_evet(sink_id, sink_name, connector, error)
658 .await
659 {
660 Ok(_) => {}
661 Err(e) => {
662 tracing::warn!(error = %e.as_report(), %sink_id, "Failed to add sink fail event to event log.");
663 }
664 }
665 }
666 SinkMetaClient::MockMetaClient(_) => {}
667 }
668 }
669}
670
671impl SinkWriterParam {
672 pub fn for_test() -> Self {
673 SinkWriterParam {
674 executor_id: Default::default(),
675 vnode_bitmap: Default::default(),
676 meta_client: Default::default(),
677 extra_partition_col_idx: Default::default(),
678
679 actor_id: 1.into(),
680 sink_id: SinkId::new(1),
681 sink_name: "test_sink".to_owned(),
682 connector: "test_connector".to_owned(),
683 streaming_config: StreamingConfig::default(),
684 }
685 }
686}
687
688fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
689 matches!(
690 sink_name,
691 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
692 )
693}
694pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
695 const SINK_NAME: &'static str;
696
697 type LogSinker: LogSinker;
698
699 fn set_default_commit_checkpoint_interval(
700 desc: &mut SinkDesc,
701 user_specified: &SinkDecouple,
702 ) -> Result<()> {
703 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
704 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
705 Some(commit_checkpoint_interval) => {
706 let commit_checkpoint_interval = commit_checkpoint_interval
707 .parse::<u64>()
708 .map_err(|e| SinkError::Config(anyhow!(e)))?;
709 if matches!(user_specified, SinkDecouple::Disable)
710 && commit_checkpoint_interval > 1
711 {
712 return Err(SinkError::Config(anyhow!(
713 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
714 )));
715 }
716 }
717 None => match user_specified {
718 SinkDecouple::Default | SinkDecouple::Enable => {
719 if matches!(Self::SINK_NAME, ICEBERG_SINK) {
720 desc.properties.insert(
721 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
722 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
723 );
724 } else {
725 desc.properties.insert(
726 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
727 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
728 );
729 }
730 }
731 SinkDecouple::Disable => {
732 desc.properties.insert(
733 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
734 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
735 );
736 }
737 },
738 }
739 }
740 Ok(())
741 }
742
743 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
745 match user_specified {
746 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
747 SinkDecouple::Disable => Ok(false),
748 }
749 }
750
751 fn support_schema_change() -> bool {
752 false
753 }
754
755 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
756 Ok(())
757 }
758
759 async fn validate(&self) -> Result<()>;
760 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
761
762 fn is_coordinated_sink(&self) -> bool {
763 false
764 }
765
766 async fn new_coordinator(
767 &self,
768 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
769 ) -> Result<SinkCommitCoordinator> {
770 Err(SinkError::Coordinator(anyhow!("no coordinator")))
771 }
772}
773
774pub trait SinkLogReader: Send {
775 fn start_from(
776 &mut self,
777 start_offset: Option<u64>,
778 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
779 fn next_item(
783 &mut self,
784 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
785
786 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
789}
790
791impl<R: LogReader> SinkLogReader for &mut R {
792 fn next_item(
793 &mut self,
794 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
795 <R as LogReader>::next_item(*self)
796 }
797
798 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
799 <R as LogReader>::truncate(*self, offset)
800 }
801
802 fn start_from(
803 &mut self,
804 start_offset: Option<u64>,
805 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
806 <R as LogReader>::start_from(*self, start_offset)
807 }
808}
809
810#[async_trait]
811pub trait LogSinker: 'static + Send {
812 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
814}
815pub type SinkCommittedEpochSubscriber = Arc<
816 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
817 + Send
818 + Sync
819 + 'static,
820>;
821
822pub enum SinkCommitCoordinator {
823 SinglePhase(BoxSinglePhaseCoordinator),
824 TwoPhase(BoxTwoPhaseCoordinator),
825}
826
827#[async_trait]
828pub trait SinglePhaseCommitCoordinator {
829 async fn init(&mut self) -> Result<()>;
831
832 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
834
835 async fn commit_schema_change(
838 &mut self,
839 _epoch: u64,
840 _schema_change: PbSinkSchemaChange,
841 ) -> Result<()> {
842 Err(SinkError::Coordinator(anyhow!(
843 "Schema change is not implemented for single-phase commit coordinator {}",
844 std::any::type_name::<Self>()
845 )))
846 }
847}
848
849#[async_trait]
850pub trait TwoPhaseCommitCoordinator {
851 async fn init(&mut self) -> Result<()>;
853
854 async fn pre_commit(
856 &mut self,
857 epoch: u64,
858 metadata: Vec<SinkMetadata>,
859 schema_change: Option<PbSinkSchemaChange>,
860 ) -> Result<Option<Vec<u8>>>;
861
862 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()>;
864
865 async fn commit_schema_change(
868 &mut self,
869 _epoch: u64,
870 _schema_change: PbSinkSchemaChange,
871 ) -> Result<()> {
872 Err(SinkError::Coordinator(anyhow!(
873 "Schema change is not implemented for two-phase commit coordinator {}",
874 std::any::type_name::<Self>()
875 )))
876 }
877
878 async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>);
880}
881
882impl SinkImpl {
883 pub fn new(mut param: SinkParam) -> Result<Self> {
884 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
885
886 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
888
889 let sink_type = param
890 .properties
891 .get(CONNECTOR_TYPE_KEY)
892 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
893
894 let sink_type = sink_type.to_lowercase();
895 match_sink_name_str!(
896 sink_type.as_str(),
897 SinkType,
898 Ok(SinkType::try_from(param)?.into()),
899 |other| {
900 Err(SinkError::Config(anyhow!(
901 "unsupported sink connector {}",
902 other
903 )))
904 }
905 )
906 }
907
908 pub fn is_sink_into_table(&self) -> bool {
909 matches!(self, SinkImpl::Table(_))
910 }
911
912 pub fn is_blackhole(&self) -> bool {
913 matches!(self, SinkImpl::BlackHole(_))
914 }
915
916 pub fn is_coordinated_sink(&self) -> bool {
917 dispatch_sink!(self, sink, sink.is_coordinated_sink())
918 }
919}
920
921pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
922 SinkImpl::new(param)
923}
924
925macro_rules! def_sink_impl {
926 () => {
927 $crate::for_all_sinks! { def_sink_impl }
928 };
929 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
930 #[derive(Debug)]
931 pub enum SinkImpl {
932 $(
933 $variant_name(Box<$sink_type>),
934 )*
935 }
936
937 $(
938 impl From<$sink_type> for SinkImpl {
939 fn from(sink: $sink_type) -> SinkImpl {
940 SinkImpl::$variant_name(Box::new(sink))
941 }
942 }
943 )*
944 };
945}
946
947def_sink_impl!();
948
949pub type Result<T> = std::result::Result<T, SinkError>;
950
951#[derive(Error, Debug)]
952pub enum SinkError {
953 #[error("Kafka error: {0}")]
954 Kafka(#[from] rdkafka::error::KafkaError),
955 #[error("Kinesis error: {0}")]
956 Kinesis(
957 #[source]
958 #[backtrace]
959 anyhow::Error,
960 ),
961 #[error("Remote sink error: {0}")]
962 Remote(
963 #[source]
964 #[backtrace]
965 anyhow::Error,
966 ),
967 #[error("Encode error: {0}")]
968 Encode(String),
969 #[error("Avro error: {0}")]
970 Avro(#[from] apache_avro::Error),
971 #[error("Iceberg error: {0}")]
972 Iceberg(
973 #[source]
974 #[backtrace]
975 anyhow::Error,
976 ),
977 #[error("config error: {0}")]
978 Config(
979 #[source]
980 #[backtrace]
981 anyhow::Error,
982 ),
983 #[error("coordinator error: {0}")]
984 Coordinator(
985 #[source]
986 #[backtrace]
987 anyhow::Error,
988 ),
989 #[error("ClickHouse error: {0}")]
990 ClickHouse(String),
991 #[error("Redis error: {0}")]
992 Redis(String),
993 #[error("Mqtt error: {0}")]
994 Mqtt(
995 #[source]
996 #[backtrace]
997 anyhow::Error,
998 ),
999 #[error("Nats error: {0}")]
1000 Nats(
1001 #[source]
1002 #[backtrace]
1003 anyhow::Error,
1004 ),
1005 #[error("Google Pub/Sub error: {0}")]
1006 GooglePubSub(
1007 #[source]
1008 #[backtrace]
1009 anyhow::Error,
1010 ),
1011 #[error("Doris/Starrocks connect error: {0}")]
1012 DorisStarrocksConnect(
1013 #[source]
1014 #[backtrace]
1015 anyhow::Error,
1016 ),
1017 #[error("Doris error: {0}")]
1018 Doris(String),
1019 #[error("DeltaLake error: {0}")]
1020 DeltaLake(
1021 #[source]
1022 #[backtrace]
1023 anyhow::Error,
1024 ),
1025 #[error("ElasticSearch/OpenSearch error: {0}")]
1026 ElasticSearchOpenSearch(
1027 #[source]
1028 #[backtrace]
1029 anyhow::Error,
1030 ),
1031 #[error("Starrocks error: {0}")]
1032 Starrocks(String),
1033 #[error("File error: {0}")]
1034 File(String),
1035 #[error("Pulsar error: {0}")]
1036 Pulsar(
1037 #[source]
1038 #[backtrace]
1039 anyhow::Error,
1040 ),
1041 #[error(transparent)]
1042 Internal(
1043 #[from]
1044 #[backtrace]
1045 anyhow::Error,
1046 ),
1047 #[error("BigQuery error: {0}")]
1048 BigQuery(
1049 #[source]
1050 #[backtrace]
1051 anyhow::Error,
1052 ),
1053 #[error("DynamoDB error: {0}")]
1054 DynamoDb(
1055 #[source]
1056 #[backtrace]
1057 anyhow::Error,
1058 ),
1059 #[error("SQL Server error: {0}")]
1060 SqlServer(
1061 #[source]
1062 #[backtrace]
1063 anyhow::Error,
1064 ),
1065 #[error("Postgres error: {0}")]
1066 Postgres(
1067 #[source]
1068 #[backtrace]
1069 anyhow::Error,
1070 ),
1071 #[error(transparent)]
1072 Connector(
1073 #[from]
1074 #[backtrace]
1075 ConnectorError,
1076 ),
1077 #[error("Secret error: {0}")]
1078 Secret(
1079 #[from]
1080 #[backtrace]
1081 SecretError,
1082 ),
1083 #[error("Mongodb error: {0}")]
1084 Mongodb(
1085 #[source]
1086 #[backtrace]
1087 anyhow::Error,
1088 ),
1089}
1090
1091impl From<sea_orm::DbErr> for SinkError {
1092 fn from(err: sea_orm::DbErr) -> Self {
1093 SinkError::Iceberg(anyhow!(err))
1094 }
1095}
1096
1097impl From<OpendalError> for SinkError {
1098 fn from(error: OpendalError) -> Self {
1099 SinkError::File(error.to_report_string())
1100 }
1101}
1102
1103impl From<parquet::errors::ParquetError> for SinkError {
1104 fn from(error: parquet::errors::ParquetError) -> Self {
1105 SinkError::File(error.to_report_string())
1106 }
1107}
1108
1109impl From<ArrayError> for SinkError {
1110 fn from(error: ArrayError) -> Self {
1111 SinkError::File(error.to_report_string())
1112 }
1113}
1114
1115impl From<RpcError> for SinkError {
1116 fn from(value: RpcError) -> Self {
1117 SinkError::Remote(anyhow!(value))
1118 }
1119}
1120
1121impl From<RedisError> for SinkError {
1122 fn from(value: RedisError) -> Self {
1123 SinkError::Redis(value.to_report_string())
1124 }
1125}
1126
1127impl From<tiberius::error::Error> for SinkError {
1128 fn from(err: tiberius::error::Error) -> Self {
1129 SinkError::SqlServer(anyhow!(err))
1130 }
1131}
1132
1133impl From<::elasticsearch::Error> for SinkError {
1134 fn from(err: ::elasticsearch::Error) -> Self {
1135 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1136 }
1137}
1138
1139impl From<::opensearch::Error> for SinkError {
1140 fn from(err: ::opensearch::Error) -> Self {
1141 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1142 }
1143}
1144
1145impl From<tokio_postgres::Error> for SinkError {
1146 fn from(err: tokio_postgres::Error) -> Self {
1147 SinkError::Postgres(anyhow!(err))
1148 }
1149}