1feature_gated_sink_mod!(big_query, "bigquery");
16pub mod boxed;
17pub mod catalog;
18feature_gated_sink_mod!(clickhouse, ClickHouse, "clickhouse");
19pub mod coordinate;
20pub mod decouple_checkpoint_log_sink;
21feature_gated_sink_mod!(deltalake, DeltaLake, "deltalake");
22feature_gated_sink_mod!(doris, "doris");
23#[cfg(any(feature = "sink-doris", feature = "sink-starrocks"))]
24pub mod doris_starrocks_connector;
25pub mod dynamodb;
26pub mod elasticsearch_opensearch;
27pub mod encoder;
28pub mod file_sink;
29pub mod formatter;
30feature_gated_sink_mod!(google_pubsub, GooglePubSub, "google_pubsub");
31pub mod iceberg;
32pub mod kafka;
33pub mod kinesis;
34use risingwave_common::bail;
35pub mod jdbc_jni_client;
36pub mod log_store;
37pub mod mock_coordination_client;
38pub mod mongodb;
39pub mod mqtt;
40pub mod nats;
41pub mod postgres;
42pub mod pulsar;
43pub mod redis;
44pub mod remote;
45pub mod snowflake_redshift;
46pub mod sqlserver;
47feature_gated_sink_mod!(starrocks, "starrocks");
48pub mod test_sink;
49pub mod trivial;
50pub mod utils;
51pub mod writer;
52pub mod prelude {
53 pub use crate::sink::{
54 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
55 SINK_USER_FORCE_COMPACTION, Sink, SinkError, SinkParam, SinkWriterParam,
56 };
57}
58
59use std::collections::BTreeMap;
60use std::future::Future;
61use std::sync::{Arc, LazyLock};
62
63use ::redis::RedisError;
64use anyhow::anyhow;
65use async_trait::async_trait;
66use decouple_checkpoint_log_sink::{
67 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
68 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
69};
70use futures::future::BoxFuture;
71use opendal::Error as OpendalError;
72use prometheus::Registry;
73use risingwave_common::array::ArrayError;
74use risingwave_common::bitmap::Bitmap;
75use risingwave_common::catalog::{ColumnDesc, Field, Schema};
76use risingwave_common::config::StreamingConfig;
77use risingwave_common::hash::ActorId;
78use risingwave_common::metrics::{
79 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
80 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
81};
82use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
83use risingwave_common::secret::{LocalSecretManager, SecretError};
84use risingwave_common::session_config::sink_decouple::SinkDecouple;
85use risingwave_common::{
86 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
87 register_guarded_int_gauge_vec_with_registry,
88};
89use risingwave_pb::catalog::PbSinkType;
90use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
91use risingwave_rpc_client::MetaClient;
92use risingwave_rpc_client::error::RpcError;
93use sea_orm::DatabaseConnection;
94use thiserror::Error;
95use thiserror_ext::AsReport;
96use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
97pub use tracing;
98
99use self::catalog::{SinkFormatDesc, SinkType};
100use self::clickhouse::CLICKHOUSE_SINK;
101use self::deltalake::DELTALAKE_SINK;
102use self::iceberg::ICEBERG_SINK;
103use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
104use self::starrocks::STARROCKS_SINK;
105use crate::WithPropertiesExt;
106use crate::connector_common::IcebergSinkCompactionUpdate;
107use crate::error::{ConnectorError, ConnectorResult};
108use crate::sink::catalog::desc::SinkDesc;
109use crate::sink::catalog::{SinkCatalog, SinkId};
110use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
111use crate::sink::file_sink::fs::FsSink;
112use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
113use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
114use crate::sink::utils::feature_gated_sink_mod;
115
116const BOUNDED_CHANNEL_SIZE: usize = 16;
117#[macro_export]
118macro_rules! for_all_sinks {
119 ($macro:path $(, $arg:tt)*) => {
120 $macro! {
121 {
122 { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
123 { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
124 { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
125 { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
126 { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
127 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
128 { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
129 { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
130 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
131 { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
132 { Jdbc, $crate::sink::remote::JdbcSink, () },
133 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
134 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
135 { Cassandra, $crate::sink::remote::CassandraSink, () },
136 { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
137 { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
138 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
139
140 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
141 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
142 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
143
144 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
145 { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
146 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
147 { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
148 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
149 { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
150 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
151 { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
152 { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
153 { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
154
155 { Test, $crate::sink::test_sink::TestSink, () },
156 { Table, $crate::sink::trivial::TableSink, () }
157 }
158 $(,$arg)*
159 }
160 };
161}
162
163#[macro_export]
164macro_rules! generate_config_use_clauses {
165 ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
166 $(
167 $crate::generate_config_use_single! { $($config_type)+ }
168 )*
169 };
170}
171
172#[macro_export]
173macro_rules! generate_config_use_single {
174 (()) => {};
176
177 ($config_type:path) => {
179 #[allow(unused_imports)]
180 pub(super) use $config_type;
181 };
182}
183
184#[macro_export]
186macro_rules! use_all_sink_configs {
187 () => {
188 $crate::for_all_sinks! { $crate::generate_config_use_clauses }
189 };
190}
191
192#[macro_export]
193macro_rules! dispatch_sink {
194 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
195 use $crate::sink::SinkImpl;
196
197 match $impl {
198 $(
199 SinkImpl::$variant_name($sink) => $body,
200 )*
201 }
202 }};
203 ($impl:expr, $sink:ident, $body:expr) => {{
204 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
205 }};
206}
207
208#[macro_export]
209macro_rules! match_sink_name_str {
210 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
211 use $crate::sink::Sink;
212 match $name_str {
213 $(
214 <$sink_type>::SINK_NAME => {
215 type $type_name = $sink_type;
216 {
217 $body
218 }
219 },
220 )*
221 other => ($on_other_closure)(other),
222 }
223 }};
224 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
225 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
226 }};
227}
228
229pub const CONNECTOR_TYPE_KEY: &str = "connector";
230pub const SINK_TYPE_OPTION: &str = "type";
231pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
233pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
234pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
235pub const SINK_TYPE_UPSERT: &str = "upsert";
236pub const SINK_TYPE_RETRACT: &str = "retract";
237pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
238pub const SINK_USER_FORCE_COMPACTION: &str = "force_compaction";
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct SinkParam {
242 pub sink_id: SinkId,
243 pub sink_name: String,
244 pub properties: BTreeMap<String, String>,
245 pub columns: Vec<ColumnDesc>,
246 pub downstream_pk: Option<Vec<usize>>,
248 pub sink_type: SinkType,
249 pub format_desc: Option<SinkFormatDesc>,
250 pub db_name: String,
251
252 pub sink_from_name: String,
258}
259
260impl SinkParam {
261 pub fn from_proto(pb_param: PbSinkParam) -> Self {
262 let table_schema = pb_param.table_schema.expect("should contain table schema");
263 let format_desc = match pb_param.format_desc {
264 Some(f) => f.try_into().ok(),
265 None => {
266 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
267 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
268 match (connector, r#type) {
269 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
270 _ => None,
271 }
272 }
273 };
274 Self {
275 sink_id: SinkId::from(pb_param.sink_id),
276 sink_name: pb_param.sink_name,
277 properties: pb_param.properties,
278 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
279 downstream_pk: if table_schema.pk_indices.is_empty() {
280 None
281 } else {
282 Some(
283 (table_schema.pk_indices.iter())
284 .map(|i| *i as usize)
285 .collect(),
286 )
287 },
288 sink_type: SinkType::from_proto(
289 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
290 ),
291 format_desc,
292 db_name: pb_param.db_name,
293 sink_from_name: pb_param.sink_from_name,
294 }
295 }
296
297 pub fn to_proto(&self) -> PbSinkParam {
298 PbSinkParam {
299 sink_id: self.sink_id,
300 sink_name: self.sink_name.clone(),
301 properties: self.properties.clone(),
302 table_schema: Some(TableSchema {
303 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
304 pk_indices: (self.downstream_pk.as_ref())
305 .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
306 }),
307 sink_type: self.sink_type.to_proto().into(),
308 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
309 db_name: self.db_name.clone(),
310 sink_from_name: self.sink_from_name.clone(),
311 }
312 }
313
314 pub fn schema(&self) -> Schema {
315 Schema {
316 fields: self.columns.iter().map(Field::from).collect(),
317 }
318 }
319
320 pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
326 self.downstream_pk.clone().unwrap_or_default()
327 }
328
329 pub fn fill_secret_for_format_desc(
332 format_desc: Option<SinkFormatDesc>,
333 ) -> Result<Option<SinkFormatDesc>> {
334 match format_desc {
335 Some(mut format_desc) => {
336 format_desc.options = LocalSecretManager::global()
337 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
338 Ok(Some(format_desc))
339 }
340 None => Ok(None),
341 }
342 }
343
344 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
346 let columns = sink_catalog
347 .visible_columns()
348 .map(|col| col.column_desc.clone())
349 .collect();
350 let properties_with_secret = LocalSecretManager::global()
351 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
352 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
353 Ok(Self {
354 sink_id: sink_catalog.id,
355 sink_name: sink_catalog.name,
356 properties: properties_with_secret,
357 columns,
358 downstream_pk: sink_catalog.downstream_pk,
359 sink_type: sink_catalog.sink_type,
360 format_desc: format_desc_with_secret,
361 db_name: sink_catalog.db_name,
362 sink_from_name: sink_catalog.sink_from_name,
363 })
364 }
365}
366
367pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
368 use crate::enforce_secret::EnforceSecret;
369
370 let connector = props
371 .get_connector()
372 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
373 let key_iter = props.key_iter();
374 match_sink_name_str!(
375 connector.as_str(),
376 PropType,
377 PropType::enforce_secret(key_iter),
378 |other| bail!("connector '{}' is not supported", other)
379 )
380}
381
382pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
383 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
384
385#[derive(Clone)]
386pub struct SinkMetrics {
387 pub sink_commit_duration: LabelGuardedHistogramVec,
388 pub connector_sink_rows_received: LabelGuardedIntCounterVec,
389
390 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
392 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
393 pub log_store_write_rows: LabelGuardedIntCounterVec,
394
395 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
397 pub log_store_read_rows: LabelGuardedIntCounterVec,
398 pub log_store_read_bytes: LabelGuardedIntCounterVec,
399 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
400
401 pub iceberg_write_qps: LabelGuardedIntCounterVec,
403 pub iceberg_write_latency: LabelGuardedHistogramVec,
404 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
405 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
406 pub iceberg_partition_num: LabelGuardedIntGaugeVec,
407 pub iceberg_write_bytes: LabelGuardedIntCounterVec,
408 pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
409}
410
411impl SinkMetrics {
412 pub fn new(registry: &Registry) -> Self {
413 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
414 "sink_commit_duration",
415 "Duration of commit op in sink",
416 &["actor_id", "connector", "sink_id", "sink_name"],
417 registry
418 )
419 .unwrap();
420
421 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
422 "connector_sink_rows_received",
423 "Number of rows received by sink",
424 &["actor_id", "connector_type", "sink_id", "sink_name"],
425 registry
426 )
427 .unwrap();
428
429 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
430 "log_store_first_write_epoch",
431 "The first write epoch of log store",
432 &["actor_id", "sink_id", "sink_name"],
433 registry
434 )
435 .unwrap();
436
437 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
438 "log_store_latest_write_epoch",
439 "The latest write epoch of log store",
440 &["actor_id", "sink_id", "sink_name"],
441 registry
442 )
443 .unwrap();
444
445 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
446 "log_store_write_rows",
447 "The write rate of rows",
448 &["actor_id", "sink_id", "sink_name"],
449 registry
450 )
451 .unwrap();
452
453 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
454 "log_store_latest_read_epoch",
455 "The latest read epoch of log store",
456 &["actor_id", "connector", "sink_id", "sink_name"],
457 registry
458 )
459 .unwrap();
460
461 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
462 "log_store_read_rows",
463 "The read rate of rows",
464 &["actor_id", "connector", "sink_id", "sink_name"],
465 registry
466 )
467 .unwrap();
468
469 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
470 "log_store_read_bytes",
471 "Total size of chunks read by log reader",
472 &["actor_id", "connector", "sink_id", "sink_name"],
473 registry
474 )
475 .unwrap();
476
477 let log_store_reader_wait_new_future_duration_ns =
478 register_guarded_int_counter_vec_with_registry!(
479 "log_store_reader_wait_new_future_duration_ns",
480 "Accumulated duration of LogReader to wait for next call to create future",
481 &["actor_id", "connector", "sink_id", "sink_name"],
482 registry
483 )
484 .unwrap();
485
486 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
487 "iceberg_write_qps",
488 "The qps of iceberg writer",
489 &["actor_id", "sink_id", "sink_name"],
490 registry
491 )
492 .unwrap();
493
494 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
495 "iceberg_write_latency",
496 "The latency of iceberg writer",
497 &["actor_id", "sink_id", "sink_name"],
498 registry
499 )
500 .unwrap();
501
502 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
503 "iceberg_rolling_unflushed_data_file",
504 "The unflushed data file count of iceberg rolling writer",
505 &["actor_id", "sink_id", "sink_name"],
506 registry
507 )
508 .unwrap();
509
510 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
511 "iceberg_position_delete_cache_num",
512 "The delete cache num of iceberg position delete writer",
513 &["actor_id", "sink_id", "sink_name"],
514 registry
515 )
516 .unwrap();
517
518 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
519 "iceberg_partition_num",
520 "The partition num of iceberg partition writer",
521 &["actor_id", "sink_id", "sink_name"],
522 registry
523 )
524 .unwrap();
525
526 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
527 "iceberg_write_bytes",
528 "The write bytes of iceberg writer",
529 &["actor_id", "sink_id", "sink_name"],
530 registry
531 )
532 .unwrap();
533
534 let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
535 "iceberg_snapshot_num",
536 "The snapshot number of iceberg table",
537 &["sink_name", "catalog_name", "table_name"],
538 registry
539 )
540 .unwrap();
541
542 Self {
543 sink_commit_duration,
544 connector_sink_rows_received,
545 log_store_first_write_epoch,
546 log_store_latest_write_epoch,
547 log_store_write_rows,
548 log_store_latest_read_epoch,
549 log_store_read_rows,
550 log_store_read_bytes,
551 log_store_reader_wait_new_future_duration_ns,
552 iceberg_write_qps,
553 iceberg_write_latency,
554 iceberg_rolling_unflushed_data_file,
555 iceberg_position_delete_cache_num,
556 iceberg_partition_num,
557 iceberg_write_bytes,
558 iceberg_snapshot_num,
559 }
560 }
561}
562
563#[derive(Clone)]
564pub struct SinkWriterParam {
565 pub executor_id: u64,
567 pub vnode_bitmap: Option<Bitmap>,
568 pub meta_client: Option<SinkMetaClient>,
569 pub extra_partition_col_idx: Option<usize>,
574
575 pub actor_id: ActorId,
576 pub sink_id: SinkId,
577 pub sink_name: String,
578 pub connector: String,
579 pub streaming_config: StreamingConfig,
580}
581
582#[derive(Clone)]
583pub struct SinkWriterMetrics {
584 pub sink_commit_duration: LabelGuardedHistogram,
585 pub connector_sink_rows_received: LabelGuardedIntCounter,
586}
587
588impl SinkWriterMetrics {
589 pub fn new(writer_param: &SinkWriterParam) -> Self {
590 let labels = [
591 &writer_param.actor_id.to_string(),
592 writer_param.connector.as_str(),
593 &writer_param.sink_id.to_string(),
594 writer_param.sink_name.as_str(),
595 ];
596 let sink_commit_duration = GLOBAL_SINK_METRICS
597 .sink_commit_duration
598 .with_guarded_label_values(&labels);
599 let connector_sink_rows_received = GLOBAL_SINK_METRICS
600 .connector_sink_rows_received
601 .with_guarded_label_values(&labels);
602 Self {
603 sink_commit_duration,
604 connector_sink_rows_received,
605 }
606 }
607
608 #[cfg(test)]
609 pub fn for_test() -> Self {
610 Self {
611 sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
612 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
613 }
614 }
615}
616
617#[derive(Clone)]
618pub enum SinkMetaClient {
619 MetaClient(MetaClient),
620 MockMetaClient(MockMetaClient),
621}
622
623impl SinkMetaClient {
624 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
625 match self {
626 SinkMetaClient::MetaClient(meta_client) => {
627 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
628 meta_client.sink_coordinate_client().await,
629 )
630 }
631 SinkMetaClient::MockMetaClient(mock_meta_client) => {
632 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
633 mock_meta_client.sink_coordinate_client(),
634 )
635 }
636 }
637 }
638
639 pub async fn add_sink_fail_evet_log(
640 &self,
641 sink_id: SinkId,
642 sink_name: String,
643 connector: String,
644 error: String,
645 ) {
646 match self {
647 SinkMetaClient::MetaClient(meta_client) => {
648 match meta_client
649 .add_sink_fail_evet(sink_id, sink_name, connector, error)
650 .await
651 {
652 Ok(_) => {}
653 Err(e) => {
654 tracing::warn!(error = %e.as_report(), %sink_id, "Failed to add sink fail event to event log.");
655 }
656 }
657 }
658 SinkMetaClient::MockMetaClient(_) => {}
659 }
660 }
661}
662
663impl SinkWriterParam {
664 pub fn for_test() -> Self {
665 SinkWriterParam {
666 executor_id: Default::default(),
667 vnode_bitmap: Default::default(),
668 meta_client: Default::default(),
669 extra_partition_col_idx: Default::default(),
670
671 actor_id: 1.into(),
672 sink_id: SinkId::new(1),
673 sink_name: "test_sink".to_owned(),
674 connector: "test_connector".to_owned(),
675 streaming_config: StreamingConfig::default(),
676 }
677 }
678}
679
680fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
681 matches!(
682 sink_name,
683 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
684 )
685}
686pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
687 const SINK_NAME: &'static str;
688
689 type LogSinker: LogSinker;
690 #[expect(deprecated)]
691 type Coordinator: SinkCommitCoordinator = NoSinkCommitCoordinator;
692
693 fn set_default_commit_checkpoint_interval(
694 desc: &mut SinkDesc,
695 user_specified: &SinkDecouple,
696 ) -> Result<()> {
697 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
698 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
699 Some(commit_checkpoint_interval) => {
700 let commit_checkpoint_interval = commit_checkpoint_interval
701 .parse::<u64>()
702 .map_err(|e| SinkError::Config(anyhow!(e)))?;
703 if matches!(user_specified, SinkDecouple::Disable)
704 && commit_checkpoint_interval > 1
705 {
706 return Err(SinkError::Config(anyhow!(
707 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
708 )));
709 }
710 }
711 None => match user_specified {
712 SinkDecouple::Default | SinkDecouple::Enable => {
713 if matches!(Self::SINK_NAME, ICEBERG_SINK) {
714 desc.properties.insert(
715 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
716 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
717 );
718 } else {
719 desc.properties.insert(
720 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
721 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
722 );
723 }
724 }
725 SinkDecouple::Disable => {
726 desc.properties.insert(
727 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
728 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
729 );
730 }
731 },
732 }
733 }
734 Ok(())
735 }
736
737 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
739 match user_specified {
740 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
741 SinkDecouple::Disable => Ok(false),
742 }
743 }
744
745 fn support_schema_change() -> bool {
746 false
747 }
748
749 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
750 Ok(())
751 }
752
753 async fn validate(&self) -> Result<()>;
754 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
755
756 fn is_coordinated_sink(&self) -> bool {
757 false
758 }
759
760 async fn new_coordinator(
761 &self,
762 _db: DatabaseConnection,
763 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
764 ) -> Result<Self::Coordinator> {
765 Err(SinkError::Coordinator(anyhow!("no coordinator")))
766 }
767}
768
769pub trait SinkLogReader: Send {
770 fn start_from(
771 &mut self,
772 start_offset: Option<u64>,
773 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
774 fn next_item(
778 &mut self,
779 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
780
781 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
784}
785
786impl<R: LogReader> SinkLogReader for &mut R {
787 fn next_item(
788 &mut self,
789 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
790 <R as LogReader>::next_item(*self)
791 }
792
793 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
794 <R as LogReader>::truncate(*self, offset)
795 }
796
797 fn start_from(
798 &mut self,
799 start_offset: Option<u64>,
800 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
801 <R as LogReader>::start_from(*self, start_offset)
802 }
803}
804
805#[async_trait]
806pub trait LogSinker: 'static + Send {
807 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
809}
810pub type SinkCommittedEpochSubscriber = Arc<
811 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
812 + Send
813 + Sync
814 + 'static,
815>;
816
817#[async_trait]
818pub trait SinkCommitCoordinator {
819 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
821 async fn commit(
826 &mut self,
827 epoch: u64,
828 metadata: Vec<SinkMetadata>,
829 add_columns: Option<Vec<Field>>,
830 ) -> Result<()>;
831}
832
833#[deprecated]
834pub struct NoSinkCommitCoordinator(!);
848
849#[expect(deprecated)]
850#[async_trait]
851impl SinkCommitCoordinator for NoSinkCommitCoordinator {
852 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
853 unreachable!()
854 }
855
856 async fn commit(
857 &mut self,
858 _epoch: u64,
859 _metadata: Vec<SinkMetadata>,
860 _add_columns: Option<Vec<Field>>,
861 ) -> Result<()> {
862 unreachable!()
863 }
864}
865
866impl SinkImpl {
867 pub fn new(mut param: SinkParam) -> Result<Self> {
868 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
869
870 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
872
873 let sink_type = param
874 .properties
875 .get(CONNECTOR_TYPE_KEY)
876 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
877
878 let sink_type = sink_type.to_lowercase();
879 match_sink_name_str!(
880 sink_type.as_str(),
881 SinkType,
882 Ok(SinkType::try_from(param)?.into()),
883 |other| {
884 Err(SinkError::Config(anyhow!(
885 "unsupported sink connector {}",
886 other
887 )))
888 }
889 )
890 }
891
892 pub fn is_sink_into_table(&self) -> bool {
893 matches!(self, SinkImpl::Table(_))
894 }
895
896 pub fn is_blackhole(&self) -> bool {
897 matches!(self, SinkImpl::BlackHole(_))
898 }
899
900 pub fn is_coordinated_sink(&self) -> bool {
901 dispatch_sink!(self, sink, sink.is_coordinated_sink())
902 }
903}
904
905pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
906 SinkImpl::new(param)
907}
908
909macro_rules! def_sink_impl {
910 () => {
911 $crate::for_all_sinks! { def_sink_impl }
912 };
913 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
914 #[derive(Debug)]
915 pub enum SinkImpl {
916 $(
917 $variant_name(Box<$sink_type>),
918 )*
919 }
920
921 $(
922 impl From<$sink_type> for SinkImpl {
923 fn from(sink: $sink_type) -> SinkImpl {
924 SinkImpl::$variant_name(Box::new(sink))
925 }
926 }
927 )*
928 };
929}
930
931def_sink_impl!();
932
933pub type Result<T> = std::result::Result<T, SinkError>;
934
935#[derive(Error, Debug)]
936pub enum SinkError {
937 #[error("Kafka error: {0}")]
938 Kafka(#[from] rdkafka::error::KafkaError),
939 #[error("Kinesis error: {0}")]
940 Kinesis(
941 #[source]
942 #[backtrace]
943 anyhow::Error,
944 ),
945 #[error("Remote sink error: {0}")]
946 Remote(
947 #[source]
948 #[backtrace]
949 anyhow::Error,
950 ),
951 #[error("Encode error: {0}")]
952 Encode(String),
953 #[error("Avro error: {0}")]
954 Avro(#[from] apache_avro::Error),
955 #[error("Iceberg error: {0}")]
956 Iceberg(
957 #[source]
958 #[backtrace]
959 anyhow::Error,
960 ),
961 #[error("config error: {0}")]
962 Config(
963 #[source]
964 #[backtrace]
965 anyhow::Error,
966 ),
967 #[error("coordinator error: {0}")]
968 Coordinator(
969 #[source]
970 #[backtrace]
971 anyhow::Error,
972 ),
973 #[error("ClickHouse error: {0}")]
974 ClickHouse(String),
975 #[error("Redis error: {0}")]
976 Redis(String),
977 #[error("Mqtt error: {0}")]
978 Mqtt(
979 #[source]
980 #[backtrace]
981 anyhow::Error,
982 ),
983 #[error("Nats error: {0}")]
984 Nats(
985 #[source]
986 #[backtrace]
987 anyhow::Error,
988 ),
989 #[error("Google Pub/Sub error: {0}")]
990 GooglePubSub(
991 #[source]
992 #[backtrace]
993 anyhow::Error,
994 ),
995 #[error("Doris/Starrocks connect error: {0}")]
996 DorisStarrocksConnect(
997 #[source]
998 #[backtrace]
999 anyhow::Error,
1000 ),
1001 #[error("Doris error: {0}")]
1002 Doris(String),
1003 #[error("DeltaLake error: {0}")]
1004 DeltaLake(
1005 #[source]
1006 #[backtrace]
1007 anyhow::Error,
1008 ),
1009 #[error("ElasticSearch/OpenSearch error: {0}")]
1010 ElasticSearchOpenSearch(
1011 #[source]
1012 #[backtrace]
1013 anyhow::Error,
1014 ),
1015 #[error("Starrocks error: {0}")]
1016 Starrocks(String),
1017 #[error("File error: {0}")]
1018 File(String),
1019 #[error("Pulsar error: {0}")]
1020 Pulsar(
1021 #[source]
1022 #[backtrace]
1023 anyhow::Error,
1024 ),
1025 #[error(transparent)]
1026 Internal(
1027 #[from]
1028 #[backtrace]
1029 anyhow::Error,
1030 ),
1031 #[error("BigQuery error: {0}")]
1032 BigQuery(
1033 #[source]
1034 #[backtrace]
1035 anyhow::Error,
1036 ),
1037 #[error("DynamoDB error: {0}")]
1038 DynamoDb(
1039 #[source]
1040 #[backtrace]
1041 anyhow::Error,
1042 ),
1043 #[error("SQL Server error: {0}")]
1044 SqlServer(
1045 #[source]
1046 #[backtrace]
1047 anyhow::Error,
1048 ),
1049 #[error("Postgres error: {0}")]
1050 Postgres(
1051 #[source]
1052 #[backtrace]
1053 anyhow::Error,
1054 ),
1055 #[error(transparent)]
1056 Connector(
1057 #[from]
1058 #[backtrace]
1059 ConnectorError,
1060 ),
1061 #[error("Secret error: {0}")]
1062 Secret(
1063 #[from]
1064 #[backtrace]
1065 SecretError,
1066 ),
1067 #[error("Mongodb error: {0}")]
1068 Mongodb(
1069 #[source]
1070 #[backtrace]
1071 anyhow::Error,
1072 ),
1073}
1074
1075impl From<sea_orm::DbErr> for SinkError {
1076 fn from(err: sea_orm::DbErr) -> Self {
1077 SinkError::Iceberg(anyhow!(err))
1078 }
1079}
1080
1081impl From<OpendalError> for SinkError {
1082 fn from(error: OpendalError) -> Self {
1083 SinkError::File(error.to_report_string())
1084 }
1085}
1086
1087impl From<parquet::errors::ParquetError> for SinkError {
1088 fn from(error: parquet::errors::ParquetError) -> Self {
1089 SinkError::File(error.to_report_string())
1090 }
1091}
1092
1093impl From<ArrayError> for SinkError {
1094 fn from(error: ArrayError) -> Self {
1095 SinkError::File(error.to_report_string())
1096 }
1097}
1098
1099impl From<RpcError> for SinkError {
1100 fn from(value: RpcError) -> Self {
1101 SinkError::Remote(anyhow!(value))
1102 }
1103}
1104
1105impl From<RedisError> for SinkError {
1106 fn from(value: RedisError) -> Self {
1107 SinkError::Redis(value.to_report_string())
1108 }
1109}
1110
1111impl From<tiberius::error::Error> for SinkError {
1112 fn from(err: tiberius::error::Error) -> Self {
1113 SinkError::SqlServer(anyhow!(err))
1114 }
1115}
1116
1117impl From<::elasticsearch::Error> for SinkError {
1118 fn from(err: ::elasticsearch::Error) -> Self {
1119 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1120 }
1121}
1122
1123impl From<::opensearch::Error> for SinkError {
1124 fn from(err: ::opensearch::Error) -> Self {
1125 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1126 }
1127}
1128
1129impl From<tokio_postgres::Error> for SinkError {
1130 fn from(err: tokio_postgres::Error) -> Self {
1131 SinkError::Postgres(anyhow!(err))
1132 }
1133}