1feature_gated_sink_mod!(big_query, "bigquery");
16pub mod boxed;
17pub mod catalog;
18feature_gated_sink_mod!(clickhouse, ClickHouse, "clickhouse");
19pub mod coordinate;
20pub mod decouple_checkpoint_log_sink;
21feature_gated_sink_mod!(deltalake, DeltaLake, "deltalake");
22feature_gated_sink_mod!(doris, "doris");
23#[cfg(any(feature = "sink-doris", feature = "sink-starrocks"))]
24pub mod doris_starrocks_connector;
25pub mod dynamodb;
26pub mod elasticsearch_opensearch;
27pub mod encoder;
28pub mod file_sink;
29pub mod formatter;
30feature_gated_sink_mod!(google_pubsub, GooglePubSub, "google_pubsub");
31pub mod iceberg;
32pub mod kafka;
33pub mod kinesis;
34use risingwave_common::bail;
35use risingwave_pb::stream_plan::PbSinkSchemaChange;
36pub mod jdbc_jni_client;
37pub mod log_store;
38pub mod mock_coordination_client;
39pub mod mongodb;
40pub mod mqtt;
41pub mod nats;
42pub mod postgres;
43pub mod pulsar;
44pub mod redis;
45pub mod remote;
46pub mod snowflake_redshift;
47pub mod sqlserver;
48feature_gated_sink_mod!(starrocks, "starrocks");
49pub mod test_sink;
50pub mod trivial;
51pub mod utils;
52pub mod writer;
53pub mod prelude {
54 pub use crate::sink::{
55 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
56 SINK_USER_FORCE_COMPACTION, Sink, SinkError, SinkParam, SinkWriterParam,
57 };
58}
59
60use std::collections::BTreeMap;
61use std::future::Future;
62use std::sync::{Arc, LazyLock};
63
64use ::redis::RedisError;
65use anyhow::anyhow;
66use async_trait::async_trait;
67use decouple_checkpoint_log_sink::{
68 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
69 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
70};
71use futures::future::BoxFuture;
72use opendal::Error as OpendalError;
73use prometheus::Registry;
74use risingwave_common::array::ArrayError;
75use risingwave_common::bitmap::Bitmap;
76use risingwave_common::catalog::{ColumnDesc, Field, Schema};
77use risingwave_common::config::StreamingConfig;
78use risingwave_common::hash::ActorId;
79use risingwave_common::metrics::{
80 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
81 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
82};
83use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
84use risingwave_common::secret::{LocalSecretManager, SecretError};
85use risingwave_common::session_config::sink_decouple::SinkDecouple;
86use risingwave_common::{
87 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
88 register_guarded_int_gauge_vec_with_registry,
89};
90use risingwave_pb::catalog::PbSinkType;
91use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
92use risingwave_pb::id::ExecutorId;
93use risingwave_rpc_client::MetaClient;
94use risingwave_rpc_client::error::RpcError;
95use starrocks::STARROCKS_SINK;
96use thiserror::Error;
97use thiserror_ext::AsReport;
98use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
99pub use tracing;
100
101use self::catalog::{SinkFormatDesc, SinkType};
102use self::clickhouse::CLICKHOUSE_SINK;
103use self::deltalake::DELTALAKE_SINK;
104use self::iceberg::ICEBERG_SINK;
105use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
106use crate::WithPropertiesExt;
107use crate::connector_common::IcebergSinkCompactionUpdate;
108use crate::error::{ConnectorError, ConnectorResult};
109use crate::sink::boxed::{BoxSinglePhaseCoordinator, BoxTwoPhaseCoordinator};
110use crate::sink::catalog::desc::SinkDesc;
111use crate::sink::catalog::{SinkCatalog, SinkId};
112use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
113use crate::sink::file_sink::fs::FsSink;
114use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
115use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
116use crate::sink::utils::feature_gated_sink_mod;
117
118const BOUNDED_CHANNEL_SIZE: usize = 16;
119#[macro_export]
120macro_rules! for_all_sinks {
121 ($macro:path $(, $arg:tt)*) => {
122 $macro! {
123 {
124 { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
125 { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
126 { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
127 { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
128 { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
129 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
130 { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
131 { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
132 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
133 { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
134 { Jdbc, $crate::sink::remote::JdbcSink, () },
135 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
136 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
137 { Cassandra, $crate::sink::remote::CassandraSink, () },
138 { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
139 { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
140 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
141
142 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
143 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
144 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
145
146 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
147 { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
148 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
149 { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
150 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
151 { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
152 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
153 { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
154 { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
155 { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
156
157 { Test, $crate::sink::test_sink::TestSink, () },
158 { Table, $crate::sink::trivial::TableSink, () }
159 }
160 $(,$arg)*
161 }
162 };
163}
164
165#[macro_export]
166macro_rules! generate_config_use_clauses {
167 ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
168 $(
169 $crate::generate_config_use_single! { $($config_type)+ }
170 )*
171 };
172}
173
174#[macro_export]
175macro_rules! generate_config_use_single {
176 (()) => {};
178
179 ($config_type:path) => {
181 #[allow(unused_imports)]
182 pub(super) use $config_type;
183 };
184}
185
186#[macro_export]
188macro_rules! use_all_sink_configs {
189 () => {
190 $crate::for_all_sinks! { $crate::generate_config_use_clauses }
191 };
192}
193
194#[macro_export]
195macro_rules! dispatch_sink {
196 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
197 use $crate::sink::SinkImpl;
198
199 match $impl {
200 $(
201 SinkImpl::$variant_name($sink) => $body,
202 )*
203 }
204 }};
205 ($impl:expr, $sink:ident, $body:expr) => {{
206 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
207 }};
208}
209
210#[macro_export]
211macro_rules! match_sink_name_str {
212 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
213 use $crate::sink::Sink;
214 match $name_str {
215 $(
216 <$sink_type>::SINK_NAME => {
217 type $type_name = $sink_type;
218 {
219 $body
220 }
221 },
222 )*
223 other => ($on_other_closure)(other),
224 }
225 }};
226 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
227 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
228 }};
229}
230
231pub const CONNECTOR_TYPE_KEY: &str = "connector";
232pub const SINK_TYPE_OPTION: &str = "type";
233pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
235pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
236pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
237pub const SINK_TYPE_UPSERT: &str = "upsert";
238pub const SINK_TYPE_RETRACT: &str = "retract";
239pub const SINK_USER_IGNORE_DELETE_OPTION: &str = "ignore_delete";
241pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
243pub const SINK_USER_FORCE_COMPACTION: &str = "force_compaction";
244
245#[derive(Debug, Clone, PartialEq, Eq)]
246pub struct SinkParam {
247 pub sink_id: SinkId,
248 pub sink_name: String,
249 pub properties: BTreeMap<String, String>,
250 pub columns: Vec<ColumnDesc>,
251 pub downstream_pk: Option<Vec<usize>>,
253 pub sink_type: SinkType,
254 pub ignore_delete: bool,
256 pub format_desc: Option<SinkFormatDesc>,
257 pub db_name: String,
258
259 pub sink_from_name: String,
265}
266
267impl SinkParam {
268 pub fn from_proto(pb_param: PbSinkParam) -> Self {
269 let ignore_delete = pb_param.ignore_delete();
270 let table_schema = pb_param.table_schema.expect("should contain table schema");
271 let format_desc = match pb_param.format_desc {
272 Some(f) => f.try_into().ok(),
273 None => {
274 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
275 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
276 match (connector, r#type) {
277 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
278 _ => None,
279 }
280 }
281 };
282 Self {
283 sink_id: SinkId::from(pb_param.sink_id),
284 sink_name: pb_param.sink_name,
285 properties: pb_param.properties,
286 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
287 downstream_pk: if table_schema.pk_indices.is_empty() {
288 None
289 } else {
290 Some(
291 (table_schema.pk_indices.iter())
292 .map(|i| *i as usize)
293 .collect(),
294 )
295 },
296 sink_type: SinkType::from_proto(
297 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
298 ),
299 ignore_delete,
300 format_desc,
301 db_name: pb_param.db_name,
302 sink_from_name: pb_param.sink_from_name,
303 }
304 }
305
306 pub fn to_proto(&self) -> PbSinkParam {
307 PbSinkParam {
308 sink_id: self.sink_id,
309 sink_name: self.sink_name.clone(),
310 properties: self.properties.clone(),
311 table_schema: Some(TableSchema {
312 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
313 pk_indices: (self.downstream_pk.as_ref())
314 .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
315 }),
316 sink_type: self.sink_type.to_proto().into(),
317 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
318 db_name: self.db_name.clone(),
319 sink_from_name: self.sink_from_name.clone(),
320 raw_ignore_delete: self.ignore_delete,
321 }
322 }
323
324 pub fn schema(&self) -> Schema {
325 Schema {
326 fields: self.columns.iter().map(Field::from).collect(),
327 }
328 }
329
330 pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
336 self.downstream_pk.clone().unwrap_or_default()
337 }
338
339 pub fn fill_secret_for_format_desc(
342 format_desc: Option<SinkFormatDesc>,
343 ) -> Result<Option<SinkFormatDesc>> {
344 match format_desc {
345 Some(mut format_desc) => {
346 format_desc.options = LocalSecretManager::global()
347 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
348 Ok(Some(format_desc))
349 }
350 None => Ok(None),
351 }
352 }
353
354 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
356 let columns = sink_catalog
357 .visible_columns()
358 .map(|col| col.column_desc.clone())
359 .collect();
360 let properties_with_secret = LocalSecretManager::global()
361 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
362 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
363 Ok(Self {
364 sink_id: sink_catalog.id,
365 sink_name: sink_catalog.name,
366 properties: properties_with_secret,
367 columns,
368 downstream_pk: sink_catalog.downstream_pk,
369 sink_type: sink_catalog.sink_type,
370 ignore_delete: sink_catalog.ignore_delete,
371 format_desc: format_desc_with_secret,
372 db_name: sink_catalog.db_name,
373 sink_from_name: sink_catalog.sink_from_name,
374 })
375 }
376}
377
378pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
379 use crate::enforce_secret::EnforceSecret;
380
381 let connector = props
382 .get_connector()
383 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
384 let key_iter = props.key_iter();
385 match_sink_name_str!(
386 connector.as_str(),
387 PropType,
388 PropType::enforce_secret(key_iter),
389 |other| bail!("connector '{}' is not supported", other)
390 )
391}
392
393pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
394 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
395
396#[derive(Clone)]
397pub struct SinkMetrics {
398 pub sink_commit_duration: LabelGuardedHistogramVec,
399 pub connector_sink_rows_received: LabelGuardedIntCounterVec,
400
401 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
403 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
404 pub log_store_write_rows: LabelGuardedIntCounterVec,
405
406 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
408 pub log_store_read_rows: LabelGuardedIntCounterVec,
409 pub log_store_read_bytes: LabelGuardedIntCounterVec,
410 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
411
412 pub iceberg_write_qps: LabelGuardedIntCounterVec,
414 pub iceberg_write_latency: LabelGuardedHistogramVec,
415 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
416 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
417 pub iceberg_partition_num: LabelGuardedIntGaugeVec,
418 pub iceberg_write_bytes: LabelGuardedIntCounterVec,
419 pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
420}
421
422impl SinkMetrics {
423 pub fn new(registry: &Registry) -> Self {
424 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
425 "sink_commit_duration",
426 "Duration of commit op in sink",
427 &["actor_id", "connector", "sink_id", "sink_name"],
428 registry
429 )
430 .unwrap();
431
432 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
433 "connector_sink_rows_received",
434 "Number of rows received by sink",
435 &["actor_id", "connector_type", "sink_id", "sink_name"],
436 registry
437 )
438 .unwrap();
439
440 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
441 "log_store_first_write_epoch",
442 "The first write epoch of log store",
443 &["actor_id", "sink_id", "sink_name"],
444 registry
445 )
446 .unwrap();
447
448 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
449 "log_store_latest_write_epoch",
450 "The latest write epoch of log store",
451 &["actor_id", "sink_id", "sink_name"],
452 registry
453 )
454 .unwrap();
455
456 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
457 "log_store_write_rows",
458 "The write rate of rows",
459 &["actor_id", "sink_id", "sink_name"],
460 registry
461 )
462 .unwrap();
463
464 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
465 "log_store_latest_read_epoch",
466 "The latest read epoch of log store",
467 &["actor_id", "connector", "sink_id", "sink_name"],
468 registry
469 )
470 .unwrap();
471
472 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
473 "log_store_read_rows",
474 "The read rate of rows",
475 &["actor_id", "connector", "sink_id", "sink_name"],
476 registry
477 )
478 .unwrap();
479
480 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
481 "log_store_read_bytes",
482 "Total size of chunks read by log reader",
483 &["actor_id", "connector", "sink_id", "sink_name"],
484 registry
485 )
486 .unwrap();
487
488 let log_store_reader_wait_new_future_duration_ns =
489 register_guarded_int_counter_vec_with_registry!(
490 "log_store_reader_wait_new_future_duration_ns",
491 "Accumulated duration of LogReader to wait for next call to create future",
492 &["actor_id", "connector", "sink_id", "sink_name"],
493 registry
494 )
495 .unwrap();
496
497 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
498 "iceberg_write_qps",
499 "The qps of iceberg writer",
500 &["actor_id", "sink_id", "sink_name"],
501 registry
502 )
503 .unwrap();
504
505 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
506 "iceberg_write_latency",
507 "The latency of iceberg writer",
508 &["actor_id", "sink_id", "sink_name"],
509 registry
510 )
511 .unwrap();
512
513 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
514 "iceberg_rolling_unflushed_data_file",
515 "The unflushed data file count of iceberg rolling writer",
516 &["actor_id", "sink_id", "sink_name"],
517 registry
518 )
519 .unwrap();
520
521 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
522 "iceberg_position_delete_cache_num",
523 "The delete cache num of iceberg position delete writer",
524 &["actor_id", "sink_id", "sink_name"],
525 registry
526 )
527 .unwrap();
528
529 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
530 "iceberg_partition_num",
531 "The partition num of iceberg partition writer",
532 &["actor_id", "sink_id", "sink_name"],
533 registry
534 )
535 .unwrap();
536
537 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
538 "iceberg_write_bytes",
539 "The write bytes of iceberg writer",
540 &["actor_id", "sink_id", "sink_name"],
541 registry
542 )
543 .unwrap();
544
545 let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
546 "iceberg_snapshot_num",
547 "The snapshot number of iceberg table",
548 &["sink_name", "catalog_name", "table_name"],
549 registry
550 )
551 .unwrap();
552
553 Self {
554 sink_commit_duration,
555 connector_sink_rows_received,
556 log_store_first_write_epoch,
557 log_store_latest_write_epoch,
558 log_store_write_rows,
559 log_store_latest_read_epoch,
560 log_store_read_rows,
561 log_store_read_bytes,
562 log_store_reader_wait_new_future_duration_ns,
563 iceberg_write_qps,
564 iceberg_write_latency,
565 iceberg_rolling_unflushed_data_file,
566 iceberg_position_delete_cache_num,
567 iceberg_partition_num,
568 iceberg_write_bytes,
569 iceberg_snapshot_num,
570 }
571 }
572}
573
574#[derive(Clone)]
575pub struct SinkWriterParam {
576 pub executor_id: ExecutorId,
578 pub vnode_bitmap: Option<Bitmap>,
579 pub meta_client: Option<SinkMetaClient>,
580 pub extra_partition_col_idx: Option<usize>,
585
586 pub actor_id: ActorId,
587 pub sink_id: SinkId,
588 pub sink_name: String,
589 pub connector: String,
590 pub streaming_config: StreamingConfig,
591}
592
593#[derive(Clone)]
594pub struct SinkWriterMetrics {
595 pub sink_commit_duration: LabelGuardedHistogram,
596 pub connector_sink_rows_received: LabelGuardedIntCounter,
597}
598
599impl SinkWriterMetrics {
600 pub fn new(writer_param: &SinkWriterParam) -> Self {
601 let labels = [
602 &writer_param.actor_id.to_string(),
603 writer_param.connector.as_str(),
604 &writer_param.sink_id.to_string(),
605 writer_param.sink_name.as_str(),
606 ];
607 let sink_commit_duration = GLOBAL_SINK_METRICS
608 .sink_commit_duration
609 .with_guarded_label_values(&labels);
610 let connector_sink_rows_received = GLOBAL_SINK_METRICS
611 .connector_sink_rows_received
612 .with_guarded_label_values(&labels);
613 Self {
614 sink_commit_duration,
615 connector_sink_rows_received,
616 }
617 }
618
619 #[cfg(test)]
620 pub fn for_test() -> Self {
621 Self {
622 sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
623 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
624 }
625 }
626}
627
628#[derive(Clone)]
629pub enum SinkMetaClient {
630 MetaClient(MetaClient),
631 MockMetaClient(MockMetaClient),
632}
633
634impl SinkMetaClient {
635 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
636 match self {
637 SinkMetaClient::MetaClient(meta_client) => {
638 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
639 meta_client.sink_coordinate_client().await,
640 )
641 }
642 SinkMetaClient::MockMetaClient(mock_meta_client) => {
643 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
644 mock_meta_client.sink_coordinate_client(),
645 )
646 }
647 }
648 }
649
650 pub async fn add_sink_fail_evet_log(
651 &self,
652 sink_id: SinkId,
653 sink_name: String,
654 connector: String,
655 error: String,
656 ) {
657 match self {
658 SinkMetaClient::MetaClient(meta_client) => {
659 match meta_client
660 .add_sink_fail_evet(sink_id, sink_name, connector, error)
661 .await
662 {
663 Ok(_) => {}
664 Err(e) => {
665 tracing::warn!(error = %e.as_report(), %sink_id, "Failed to add sink fail event to event log.");
666 }
667 }
668 }
669 SinkMetaClient::MockMetaClient(_) => {}
670 }
671 }
672}
673
674impl SinkWriterParam {
675 pub fn for_test() -> Self {
676 SinkWriterParam {
677 executor_id: Default::default(),
678 vnode_bitmap: Default::default(),
679 meta_client: Default::default(),
680 extra_partition_col_idx: Default::default(),
681
682 actor_id: 1.into(),
683 sink_id: SinkId::new(1),
684 sink_name: "test_sink".to_owned(),
685 connector: "test_connector".to_owned(),
686 streaming_config: StreamingConfig::default(),
687 }
688 }
689}
690
691fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
692 matches!(
693 sink_name,
694 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
695 )
696}
697pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
698 const SINK_NAME: &'static str;
699
700 type LogSinker: LogSinker;
701
702 fn set_default_commit_checkpoint_interval(
703 desc: &mut SinkDesc,
704 user_specified: &SinkDecouple,
705 ) -> Result<()> {
706 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
707 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
708 Some(commit_checkpoint_interval) => {
709 let commit_checkpoint_interval = commit_checkpoint_interval
710 .parse::<u64>()
711 .map_err(|e| SinkError::Config(anyhow!(e)))?;
712 if matches!(user_specified, SinkDecouple::Disable)
713 && commit_checkpoint_interval > 1
714 {
715 return Err(SinkError::Config(anyhow!(
716 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
717 )));
718 }
719 }
720 None => match user_specified {
721 SinkDecouple::Default | SinkDecouple::Enable => {
722 if matches!(Self::SINK_NAME, ICEBERG_SINK) {
723 desc.properties.insert(
724 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
725 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
726 );
727 } else {
728 desc.properties.insert(
729 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
730 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
731 );
732 }
733 }
734 SinkDecouple::Disable => {
735 desc.properties.insert(
736 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
737 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
738 );
739 }
740 },
741 }
742 }
743 Ok(())
744 }
745
746 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
748 match user_specified {
749 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
750 SinkDecouple::Disable => Ok(false),
751 }
752 }
753
754 fn support_schema_change() -> bool {
755 false
756 }
757
758 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
759 Ok(())
760 }
761
762 async fn validate(&self) -> Result<()>;
763 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
764
765 fn is_coordinated_sink(&self) -> bool {
766 false
767 }
768
769 async fn new_coordinator(
770 &self,
771 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
772 ) -> Result<SinkCommitCoordinator> {
773 Err(SinkError::Coordinator(anyhow!("no coordinator")))
774 }
775}
776
777pub trait SinkLogReader: Send {
778 fn start_from(
779 &mut self,
780 start_offset: Option<u64>,
781 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
782 fn next_item(
786 &mut self,
787 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
788
789 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
792}
793
794impl<R: LogReader> SinkLogReader for &mut R {
795 fn next_item(
796 &mut self,
797 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
798 <R as LogReader>::next_item(*self)
799 }
800
801 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
802 <R as LogReader>::truncate(*self, offset)
803 }
804
805 fn start_from(
806 &mut self,
807 start_offset: Option<u64>,
808 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
809 <R as LogReader>::start_from(*self, start_offset)
810 }
811}
812
813#[async_trait]
814pub trait LogSinker: 'static + Send {
815 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
817}
818pub type SinkCommittedEpochSubscriber = Arc<
819 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
820 + Send
821 + Sync
822 + 'static,
823>;
824
825pub enum SinkCommitCoordinator {
826 SinglePhase(BoxSinglePhaseCoordinator),
827 TwoPhase(BoxTwoPhaseCoordinator),
828}
829
830#[async_trait]
831pub trait SinglePhaseCommitCoordinator {
832 async fn init(&mut self) -> Result<()>;
834
835 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
837
838 async fn commit_schema_change(
841 &mut self,
842 _epoch: u64,
843 _schema_change: PbSinkSchemaChange,
844 ) -> Result<()> {
845 Err(SinkError::Coordinator(anyhow!(
846 "Schema change is not implemented for single-phase commit coordinator {}",
847 std::any::type_name::<Self>()
848 )))
849 }
850}
851
852#[async_trait]
853pub trait TwoPhaseCommitCoordinator {
854 async fn init(&mut self) -> Result<()>;
856
857 async fn pre_commit(
859 &mut self,
860 epoch: u64,
861 metadata: Vec<SinkMetadata>,
862 schema_change: Option<PbSinkSchemaChange>,
863 ) -> Result<Option<Vec<u8>>>;
864
865 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()>;
867
868 async fn commit_schema_change(
871 &mut self,
872 _epoch: u64,
873 _schema_change: PbSinkSchemaChange,
874 ) -> Result<()> {
875 Err(SinkError::Coordinator(anyhow!(
876 "Schema change is not implemented for two-phase commit coordinator {}",
877 std::any::type_name::<Self>()
878 )))
879 }
880
881 async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>);
883}
884
885impl SinkImpl {
886 pub fn new(mut param: SinkParam) -> Result<Self> {
887 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
888
889 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
891
892 let sink_type = param
893 .properties
894 .get(CONNECTOR_TYPE_KEY)
895 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
896
897 let sink_type = sink_type.to_lowercase();
898 match_sink_name_str!(
899 sink_type.as_str(),
900 SinkType,
901 Ok(SinkType::try_from(param)?.into()),
902 |other| {
903 Err(SinkError::Config(anyhow!(
904 "unsupported sink connector {}",
905 other
906 )))
907 }
908 )
909 }
910
911 pub fn is_sink_into_table(&self) -> bool {
912 matches!(self, SinkImpl::Table(_))
913 }
914
915 pub fn is_blackhole(&self) -> bool {
916 matches!(self, SinkImpl::BlackHole(_))
917 }
918
919 pub fn is_coordinated_sink(&self) -> bool {
920 dispatch_sink!(self, sink, sink.is_coordinated_sink())
921 }
922}
923
924pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
925 SinkImpl::new(param)
926}
927
928macro_rules! def_sink_impl {
929 () => {
930 $crate::for_all_sinks! { def_sink_impl }
931 };
932 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
933 #[derive(Debug)]
934 pub enum SinkImpl {
935 $(
936 $variant_name(Box<$sink_type>),
937 )*
938 }
939
940 $(
941 impl From<$sink_type> for SinkImpl {
942 fn from(sink: $sink_type) -> SinkImpl {
943 SinkImpl::$variant_name(Box::new(sink))
944 }
945 }
946 )*
947 };
948}
949
950def_sink_impl!();
951
952pub type Result<T> = std::result::Result<T, SinkError>;
953
954#[derive(Error, Debug)]
955pub enum SinkError {
956 #[error("Kafka error: {0}")]
957 Kafka(#[from] rdkafka::error::KafkaError),
958 #[error("Kinesis error: {0}")]
959 Kinesis(
960 #[source]
961 #[backtrace]
962 anyhow::Error,
963 ),
964 #[error("Remote sink error: {0}")]
965 Remote(
966 #[source]
967 #[backtrace]
968 anyhow::Error,
969 ),
970 #[error("Encode error: {0}")]
971 Encode(String),
972 #[error("Avro error: {0}")]
973 Avro(#[from] apache_avro::Error),
974 #[error("Iceberg error: {0}")]
975 Iceberg(
976 #[source]
977 #[backtrace]
978 anyhow::Error,
979 ),
980 #[error("config error: {0}")]
981 Config(
982 #[source]
983 #[backtrace]
984 anyhow::Error,
985 ),
986 #[error("coordinator error: {0}")]
987 Coordinator(
988 #[source]
989 #[backtrace]
990 anyhow::Error,
991 ),
992 #[error("ClickHouse error: {0}")]
993 ClickHouse(String),
994 #[error("Redis error: {0}")]
995 Redis(String),
996 #[error("Mqtt error: {0}")]
997 Mqtt(
998 #[source]
999 #[backtrace]
1000 anyhow::Error,
1001 ),
1002 #[error("Nats error: {0}")]
1003 Nats(
1004 #[source]
1005 #[backtrace]
1006 anyhow::Error,
1007 ),
1008 #[error("Google Pub/Sub error: {0}")]
1009 GooglePubSub(
1010 #[source]
1011 #[backtrace]
1012 anyhow::Error,
1013 ),
1014 #[error("Doris/Starrocks connect error: {0}")]
1015 DorisStarrocksConnect(
1016 #[source]
1017 #[backtrace]
1018 anyhow::Error,
1019 ),
1020 #[error("Doris error: {0}")]
1021 Doris(String),
1022 #[error("DeltaLake error: {0}")]
1023 DeltaLake(
1024 #[source]
1025 #[backtrace]
1026 anyhow::Error,
1027 ),
1028 #[error("ElasticSearch/OpenSearch error: {0}")]
1029 ElasticSearchOpenSearch(
1030 #[source]
1031 #[backtrace]
1032 anyhow::Error,
1033 ),
1034 #[error("Starrocks error: {0}")]
1035 Starrocks(String),
1036 #[error("File error: {0}")]
1037 File(String),
1038 #[error("Pulsar error: {0}")]
1039 Pulsar(
1040 #[source]
1041 #[backtrace]
1042 anyhow::Error,
1043 ),
1044 #[error(transparent)]
1045 Internal(
1046 #[from]
1047 #[backtrace]
1048 anyhow::Error,
1049 ),
1050 #[error("BigQuery error: {0}")]
1051 BigQuery(
1052 #[source]
1053 #[backtrace]
1054 anyhow::Error,
1055 ),
1056 #[error("DynamoDB error: {0}")]
1057 DynamoDb(
1058 #[source]
1059 #[backtrace]
1060 anyhow::Error,
1061 ),
1062 #[error("SQL Server error: {0}")]
1063 SqlServer(
1064 #[source]
1065 #[backtrace]
1066 anyhow::Error,
1067 ),
1068 #[error("Postgres error: {0}")]
1069 Postgres(
1070 #[source]
1071 #[backtrace]
1072 anyhow::Error,
1073 ),
1074 #[error(transparent)]
1075 Connector(
1076 #[from]
1077 #[backtrace]
1078 ConnectorError,
1079 ),
1080 #[error("Secret error: {0}")]
1081 Secret(
1082 #[from]
1083 #[backtrace]
1084 SecretError,
1085 ),
1086 #[error("Mongodb error: {0}")]
1087 Mongodb(
1088 #[source]
1089 #[backtrace]
1090 anyhow::Error,
1091 ),
1092}
1093
1094impl From<sea_orm::DbErr> for SinkError {
1095 fn from(err: sea_orm::DbErr) -> Self {
1096 SinkError::Iceberg(anyhow!(err))
1097 }
1098}
1099
1100impl From<OpendalError> for SinkError {
1101 fn from(error: OpendalError) -> Self {
1102 SinkError::File(error.to_report_string())
1103 }
1104}
1105
1106impl From<parquet::errors::ParquetError> for SinkError {
1107 fn from(error: parquet::errors::ParquetError) -> Self {
1108 SinkError::File(error.to_report_string())
1109 }
1110}
1111
1112impl From<ArrayError> for SinkError {
1113 fn from(error: ArrayError) -> Self {
1114 SinkError::File(error.to_report_string())
1115 }
1116}
1117
1118impl From<RpcError> for SinkError {
1119 fn from(value: RpcError) -> Self {
1120 SinkError::Remote(anyhow!(value))
1121 }
1122}
1123
1124impl From<RedisError> for SinkError {
1125 fn from(value: RedisError) -> Self {
1126 SinkError::Redis(value.to_report_string())
1127 }
1128}
1129
1130impl From<tiberius::error::Error> for SinkError {
1131 fn from(err: tiberius::error::Error) -> Self {
1132 SinkError::SqlServer(anyhow!(err))
1133 }
1134}
1135
1136impl From<::elasticsearch::Error> for SinkError {
1137 fn from(err: ::elasticsearch::Error) -> Self {
1138 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1139 }
1140}
1141
1142impl From<::opensearch::Error> for SinkError {
1143 fn from(err: ::opensearch::Error) -> Self {
1144 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1145 }
1146}
1147
1148impl From<tokio_postgres::Error> for SinkError {
1149 fn from(err: tokio_postgres::Error) -> Self {
1150 SinkError::Postgres(anyhow!(err))
1151 }
1152}