1pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32use risingwave_common::bail;
33pub mod jdbc_jni_client;
34pub mod log_store;
35pub mod mock_coordination_client;
36pub mod mongodb;
37pub mod mqtt;
38pub mod nats;
39pub mod postgres;
40pub mod pulsar;
41pub mod redis;
42pub mod remote;
43pub mod snowflake_redshift;
44pub mod sqlserver;
45pub mod starrocks;
46pub mod test_sink;
47pub mod trivial;
48pub mod utils;
49pub mod writer;
50pub mod prelude {
51 pub use crate::sink::{
52 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkError,
53 SinkParam, SinkWriterParam,
54 };
55}
56
57use std::collections::BTreeMap;
58use std::future::Future;
59use std::sync::{Arc, LazyLock};
60
61use ::clickhouse::error::Error as ClickHouseError;
62use ::redis::RedisError;
63use anyhow::anyhow;
64use async_trait::async_trait;
65use clickhouse::CLICKHOUSE_SINK;
66use decouple_checkpoint_log_sink::{
67 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
68 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
69};
70use deltalake::DELTALAKE_SINK;
71use futures::future::BoxFuture;
72use iceberg::ICEBERG_SINK;
73use opendal::Error as OpendalError;
74use prometheus::Registry;
75use risingwave_common::array::ArrayError;
76use risingwave_common::bitmap::Bitmap;
77use risingwave_common::catalog::{ColumnDesc, Field, Schema};
78use risingwave_common::config::StreamingConfig;
79use risingwave_common::hash::ActorId;
80use risingwave_common::metrics::{
81 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
82 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
83};
84use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
85use risingwave_common::secret::{LocalSecretManager, SecretError};
86use risingwave_common::session_config::sink_decouple::SinkDecouple;
87use risingwave_common::{
88 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
89 register_guarded_int_gauge_vec_with_registry,
90};
91use risingwave_pb::catalog::PbSinkType;
92use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
93use risingwave_rpc_client::MetaClient;
94use risingwave_rpc_client::error::RpcError;
95use sea_orm::DatabaseConnection;
96use starrocks::STARROCKS_SINK;
97use thiserror::Error;
98use thiserror_ext::AsReport;
99use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
100pub use tracing;
101
102use self::catalog::{SinkFormatDesc, SinkType};
103use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
104use crate::WithPropertiesExt;
105use crate::connector_common::IcebergSinkCompactionUpdate;
106use crate::error::{ConnectorError, ConnectorResult};
107use crate::sink::catalog::desc::SinkDesc;
108use crate::sink::catalog::{SinkCatalog, SinkId};
109use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
110use crate::sink::file_sink::fs::FsSink;
111use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
112use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
113use crate::sink::writer::SinkWriter;
114
115const BOUNDED_CHANNEL_SIZE: usize = 16;
116#[macro_export]
117macro_rules! for_all_sinks {
118 ($macro:path $(, $arg:tt)*) => {
119 $macro! {
120 {
121 { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
122 { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
123 { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
124 { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
125 { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
126 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
127 { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
128 { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
129 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
130 { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
131 { Jdbc, $crate::sink::remote::JdbcSink, () },
132 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
133 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
134 { Cassandra, $crate::sink::remote::CassandraSink, () },
135 { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
136 { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
137 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
138
139 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
140 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
141 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
142
143 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
144 { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
145 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
146 { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
147 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
148 { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
149 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
150 { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
151 { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
152 { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
153
154 { Test, $crate::sink::test_sink::TestSink, () },
155 { Table, $crate::sink::trivial::TableSink, () }
156 }
157 $(,$arg)*
158 }
159 };
160}
161
162#[macro_export]
163macro_rules! generate_config_use_clauses {
164 ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
165 $(
166 $crate::generate_config_use_single! { $($config_type)+ }
167 )*
168 };
169}
170
171#[macro_export]
172macro_rules! generate_config_use_single {
173 (()) => {};
175
176 ($config_type:path) => {
178 #[allow(unused_imports)]
179 pub(super) use $config_type;
180 };
181}
182
183#[macro_export]
185macro_rules! use_all_sink_configs {
186 () => {
187 $crate::for_all_sinks! { $crate::generate_config_use_clauses }
188 };
189}
190
191#[macro_export]
192macro_rules! dispatch_sink {
193 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
194 use $crate::sink::SinkImpl;
195
196 match $impl {
197 $(
198 SinkImpl::$variant_name($sink) => $body,
199 )*
200 }
201 }};
202 ($impl:expr, $sink:ident, $body:expr) => {{
203 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
204 }};
205}
206
207#[macro_export]
208macro_rules! match_sink_name_str {
209 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
210 use $crate::sink::Sink;
211 match $name_str {
212 $(
213 <$sink_type>::SINK_NAME => {
214 type $type_name = $sink_type;
215 {
216 $body
217 }
218 },
219 )*
220 other => ($on_other_closure)(other),
221 }
222 }};
223 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
224 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
225 }};
226}
227
228pub const CONNECTOR_TYPE_KEY: &str = "connector";
229pub const SINK_TYPE_OPTION: &str = "type";
230pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
232pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
233pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
234pub const SINK_TYPE_UPSERT: &str = "upsert";
235pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
236
237#[derive(Debug, Clone, PartialEq, Eq)]
238pub struct SinkParam {
239 pub sink_id: SinkId,
240 pub sink_name: String,
241 pub properties: BTreeMap<String, String>,
242 pub columns: Vec<ColumnDesc>,
243 pub downstream_pk: Option<Vec<usize>>,
245 pub sink_type: SinkType,
246 pub format_desc: Option<SinkFormatDesc>,
247 pub db_name: String,
248
249 pub sink_from_name: String,
255}
256
257impl SinkParam {
258 pub fn from_proto(pb_param: PbSinkParam) -> Self {
259 let table_schema = pb_param.table_schema.expect("should contain table schema");
260 let format_desc = match pb_param.format_desc {
261 Some(f) => f.try_into().ok(),
262 None => {
263 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
264 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
265 match (connector, r#type) {
266 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
267 _ => None,
268 }
269 }
270 };
271 Self {
272 sink_id: SinkId::from(pb_param.sink_id),
273 sink_name: pb_param.sink_name,
274 properties: pb_param.properties,
275 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
276 downstream_pk: if table_schema.pk_indices.is_empty() {
277 None
278 } else {
279 Some(
280 (table_schema.pk_indices.iter())
281 .map(|i| *i as usize)
282 .collect(),
283 )
284 },
285 sink_type: SinkType::from_proto(
286 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
287 ),
288 format_desc,
289 db_name: pb_param.db_name,
290 sink_from_name: pb_param.sink_from_name,
291 }
292 }
293
294 pub fn to_proto(&self) -> PbSinkParam {
295 PbSinkParam {
296 sink_id: self.sink_id.sink_id,
297 sink_name: self.sink_name.clone(),
298 properties: self.properties.clone(),
299 table_schema: Some(TableSchema {
300 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
301 pk_indices: (self.downstream_pk.as_ref())
302 .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
303 }),
304 sink_type: self.sink_type.to_proto().into(),
305 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
306 db_name: self.db_name.clone(),
307 sink_from_name: self.sink_from_name.clone(),
308 }
309 }
310
311 pub fn schema(&self) -> Schema {
312 Schema {
313 fields: self.columns.iter().map(Field::from).collect(),
314 }
315 }
316
317 pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
323 self.downstream_pk.clone().unwrap_or_default()
324 }
325
326 pub fn fill_secret_for_format_desc(
329 format_desc: Option<SinkFormatDesc>,
330 ) -> Result<Option<SinkFormatDesc>> {
331 match format_desc {
332 Some(mut format_desc) => {
333 format_desc.options = LocalSecretManager::global()
334 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
335 Ok(Some(format_desc))
336 }
337 None => Ok(None),
338 }
339 }
340
341 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
343 let columns = sink_catalog
344 .visible_columns()
345 .map(|col| col.column_desc.clone())
346 .collect();
347 let properties_with_secret = LocalSecretManager::global()
348 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
349 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
350 Ok(Self {
351 sink_id: sink_catalog.id,
352 sink_name: sink_catalog.name,
353 properties: properties_with_secret,
354 columns,
355 downstream_pk: sink_catalog.downstream_pk,
356 sink_type: sink_catalog.sink_type,
357 format_desc: format_desc_with_secret,
358 db_name: sink_catalog.db_name,
359 sink_from_name: sink_catalog.sink_from_name,
360 })
361 }
362}
363
364pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
365 use crate::enforce_secret::EnforceSecret;
366
367 let connector = props
368 .get_connector()
369 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
370 let key_iter = props.key_iter();
371 match_sink_name_str!(
372 connector.as_str(),
373 PropType,
374 PropType::enforce_secret(key_iter),
375 |other| bail!("connector '{}' is not supported", other)
376 )
377}
378
379pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
380 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
381
382#[derive(Clone)]
383pub struct SinkMetrics {
384 pub sink_commit_duration: LabelGuardedHistogramVec,
385 pub connector_sink_rows_received: LabelGuardedIntCounterVec,
386
387 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
389 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
390 pub log_store_write_rows: LabelGuardedIntCounterVec,
391
392 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
394 pub log_store_read_rows: LabelGuardedIntCounterVec,
395 pub log_store_read_bytes: LabelGuardedIntCounterVec,
396 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
397
398 pub iceberg_write_qps: LabelGuardedIntCounterVec,
400 pub iceberg_write_latency: LabelGuardedHistogramVec,
401 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
402 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
403 pub iceberg_partition_num: LabelGuardedIntGaugeVec,
404 pub iceberg_write_bytes: LabelGuardedIntCounterVec,
405 pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
406}
407
408impl SinkMetrics {
409 pub fn new(registry: &Registry) -> Self {
410 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
411 "sink_commit_duration",
412 "Duration of commit op in sink",
413 &["actor_id", "connector", "sink_id", "sink_name"],
414 registry
415 )
416 .unwrap();
417
418 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
419 "connector_sink_rows_received",
420 "Number of rows received by sink",
421 &["actor_id", "connector_type", "sink_id", "sink_name"],
422 registry
423 )
424 .unwrap();
425
426 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
427 "log_store_first_write_epoch",
428 "The first write epoch of log store",
429 &["actor_id", "sink_id", "sink_name"],
430 registry
431 )
432 .unwrap();
433
434 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
435 "log_store_latest_write_epoch",
436 "The latest write epoch of log store",
437 &["actor_id", "sink_id", "sink_name"],
438 registry
439 )
440 .unwrap();
441
442 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
443 "log_store_write_rows",
444 "The write rate of rows",
445 &["actor_id", "sink_id", "sink_name"],
446 registry
447 )
448 .unwrap();
449
450 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
451 "log_store_latest_read_epoch",
452 "The latest read epoch of log store",
453 &["actor_id", "connector", "sink_id", "sink_name"],
454 registry
455 )
456 .unwrap();
457
458 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
459 "log_store_read_rows",
460 "The read rate of rows",
461 &["actor_id", "connector", "sink_id", "sink_name"],
462 registry
463 )
464 .unwrap();
465
466 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
467 "log_store_read_bytes",
468 "Total size of chunks read by log reader",
469 &["actor_id", "connector", "sink_id", "sink_name"],
470 registry
471 )
472 .unwrap();
473
474 let log_store_reader_wait_new_future_duration_ns =
475 register_guarded_int_counter_vec_with_registry!(
476 "log_store_reader_wait_new_future_duration_ns",
477 "Accumulated duration of LogReader to wait for next call to create future",
478 &["actor_id", "connector", "sink_id", "sink_name"],
479 registry
480 )
481 .unwrap();
482
483 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
484 "iceberg_write_qps",
485 "The qps of iceberg writer",
486 &["actor_id", "sink_id", "sink_name"],
487 registry
488 )
489 .unwrap();
490
491 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
492 "iceberg_write_latency",
493 "The latency of iceberg writer",
494 &["actor_id", "sink_id", "sink_name"],
495 registry
496 )
497 .unwrap();
498
499 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
500 "iceberg_rolling_unflushed_data_file",
501 "The unflushed data file count of iceberg rolling writer",
502 &["actor_id", "sink_id", "sink_name"],
503 registry
504 )
505 .unwrap();
506
507 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
508 "iceberg_position_delete_cache_num",
509 "The delete cache num of iceberg position delete writer",
510 &["actor_id", "sink_id", "sink_name"],
511 registry
512 )
513 .unwrap();
514
515 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
516 "iceberg_partition_num",
517 "The partition num of iceberg partition writer",
518 &["actor_id", "sink_id", "sink_name"],
519 registry
520 )
521 .unwrap();
522
523 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
524 "iceberg_write_bytes",
525 "The write bytes of iceberg writer",
526 &["actor_id", "sink_id", "sink_name"],
527 registry
528 )
529 .unwrap();
530
531 let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
532 "iceberg_snapshot_num",
533 "The snapshot number of iceberg table",
534 &["sink_name", "catalog_name", "table_name"],
535 registry
536 )
537 .unwrap();
538
539 Self {
540 sink_commit_duration,
541 connector_sink_rows_received,
542 log_store_first_write_epoch,
543 log_store_latest_write_epoch,
544 log_store_write_rows,
545 log_store_latest_read_epoch,
546 log_store_read_rows,
547 log_store_read_bytes,
548 log_store_reader_wait_new_future_duration_ns,
549 iceberg_write_qps,
550 iceberg_write_latency,
551 iceberg_rolling_unflushed_data_file,
552 iceberg_position_delete_cache_num,
553 iceberg_partition_num,
554 iceberg_write_bytes,
555 iceberg_snapshot_num,
556 }
557 }
558}
559
560#[derive(Clone)]
561pub struct SinkWriterParam {
562 pub executor_id: u64,
564 pub vnode_bitmap: Option<Bitmap>,
565 pub meta_client: Option<SinkMetaClient>,
566 pub extra_partition_col_idx: Option<usize>,
571
572 pub actor_id: ActorId,
573 pub sink_id: SinkId,
574 pub sink_name: String,
575 pub connector: String,
576 pub streaming_config: StreamingConfig,
577}
578
579#[derive(Clone)]
580pub struct SinkWriterMetrics {
581 pub sink_commit_duration: LabelGuardedHistogram,
582 pub connector_sink_rows_received: LabelGuardedIntCounter,
583}
584
585impl SinkWriterMetrics {
586 pub fn new(writer_param: &SinkWriterParam) -> Self {
587 let labels = [
588 &writer_param.actor_id.to_string(),
589 writer_param.connector.as_str(),
590 &writer_param.sink_id.to_string(),
591 writer_param.sink_name.as_str(),
592 ];
593 let sink_commit_duration = GLOBAL_SINK_METRICS
594 .sink_commit_duration
595 .with_guarded_label_values(&labels);
596 let connector_sink_rows_received = GLOBAL_SINK_METRICS
597 .connector_sink_rows_received
598 .with_guarded_label_values(&labels);
599 Self {
600 sink_commit_duration,
601 connector_sink_rows_received,
602 }
603 }
604
605 #[cfg(test)]
606 pub fn for_test() -> Self {
607 Self {
608 sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
609 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
610 }
611 }
612}
613
614#[derive(Clone)]
615pub enum SinkMetaClient {
616 MetaClient(MetaClient),
617 MockMetaClient(MockMetaClient),
618}
619
620impl SinkMetaClient {
621 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
622 match self {
623 SinkMetaClient::MetaClient(meta_client) => {
624 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
625 meta_client.sink_coordinate_client().await,
626 )
627 }
628 SinkMetaClient::MockMetaClient(mock_meta_client) => {
629 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
630 mock_meta_client.sink_coordinate_client(),
631 )
632 }
633 }
634 }
635
636 pub async fn add_sink_fail_evet_log(
637 &self,
638 sink_id: u32,
639 sink_name: String,
640 connector: String,
641 error: String,
642 ) {
643 match self {
644 SinkMetaClient::MetaClient(meta_client) => {
645 match meta_client
646 .add_sink_fail_evet(sink_id, sink_name, connector, error)
647 .await
648 {
649 Ok(_) => {}
650 Err(e) => {
651 tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Failed to add sink fail event to event log.");
652 }
653 }
654 }
655 SinkMetaClient::MockMetaClient(_) => {}
656 }
657 }
658}
659
660impl SinkWriterParam {
661 pub fn for_test() -> Self {
662 SinkWriterParam {
663 executor_id: Default::default(),
664 vnode_bitmap: Default::default(),
665 meta_client: Default::default(),
666 extra_partition_col_idx: Default::default(),
667
668 actor_id: 1,
669 sink_id: SinkId::new(1),
670 sink_name: "test_sink".to_owned(),
671 connector: "test_connector".to_owned(),
672 streaming_config: StreamingConfig::default(),
673 }
674 }
675}
676
677fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
678 matches!(
679 sink_name,
680 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
681 )
682}
683pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
684 const SINK_NAME: &'static str;
685
686 type LogSinker: LogSinker;
687 #[expect(deprecated)]
688 type Coordinator: SinkCommitCoordinator = NoSinkCommitCoordinator;
689
690 fn set_default_commit_checkpoint_interval(
691 desc: &mut SinkDesc,
692 user_specified: &SinkDecouple,
693 ) -> Result<()> {
694 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
695 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
696 Some(commit_checkpoint_interval) => {
697 let commit_checkpoint_interval = commit_checkpoint_interval
698 .parse::<u64>()
699 .map_err(|e| SinkError::Config(anyhow!(e)))?;
700 if matches!(user_specified, SinkDecouple::Disable)
701 && commit_checkpoint_interval > 1
702 {
703 return Err(SinkError::Config(anyhow!(
704 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
705 )));
706 }
707 }
708 None => match user_specified {
709 SinkDecouple::Default | SinkDecouple::Enable => {
710 if matches!(Self::SINK_NAME, ICEBERG_SINK) {
711 desc.properties.insert(
712 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
713 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
714 );
715 } else {
716 desc.properties.insert(
717 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
718 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
719 );
720 }
721 }
722 SinkDecouple::Disable => {
723 desc.properties.insert(
724 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
725 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
726 );
727 }
728 },
729 }
730 }
731 Ok(())
732 }
733
734 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
736 match user_specified {
737 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
738 SinkDecouple::Disable => Ok(false),
739 }
740 }
741
742 fn support_schema_change() -> bool {
743 false
744 }
745
746 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
747 Ok(())
748 }
749
750 async fn validate(&self) -> Result<()>;
751 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
752
753 fn is_coordinated_sink(&self) -> bool {
754 false
755 }
756
757 async fn new_coordinator(
758 &self,
759 _db: DatabaseConnection,
760 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
761 ) -> Result<Self::Coordinator> {
762 Err(SinkError::Coordinator(anyhow!("no coordinator")))
763 }
764}
765
766pub trait SinkLogReader: Send {
767 fn start_from(
768 &mut self,
769 start_offset: Option<u64>,
770 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
771 fn next_item(
775 &mut self,
776 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
777
778 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
781}
782
783impl<R: LogReader> SinkLogReader for &mut R {
784 fn next_item(
785 &mut self,
786 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
787 <R as LogReader>::next_item(*self)
788 }
789
790 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
791 <R as LogReader>::truncate(*self, offset)
792 }
793
794 fn start_from(
795 &mut self,
796 start_offset: Option<u64>,
797 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
798 <R as LogReader>::start_from(*self, start_offset)
799 }
800}
801
802#[async_trait]
803pub trait LogSinker: 'static + Send {
804 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
806}
807pub type SinkCommittedEpochSubscriber = Arc<
808 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
809 + Send
810 + Sync
811 + 'static,
812>;
813
814#[async_trait]
815pub trait SinkCommitCoordinator {
816 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
818 async fn commit(
823 &mut self,
824 epoch: u64,
825 metadata: Vec<SinkMetadata>,
826 add_columns: Option<Vec<Field>>,
827 ) -> Result<()>;
828}
829
830#[deprecated]
831pub struct NoSinkCommitCoordinator(!);
845
846#[expect(deprecated)]
847#[async_trait]
848impl SinkCommitCoordinator for NoSinkCommitCoordinator {
849 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
850 unreachable!()
851 }
852
853 async fn commit(
854 &mut self,
855 _epoch: u64,
856 _metadata: Vec<SinkMetadata>,
857 _add_columns: Option<Vec<Field>>,
858 ) -> Result<()> {
859 unreachable!()
860 }
861}
862
863impl SinkImpl {
864 pub fn new(mut param: SinkParam) -> Result<Self> {
865 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
866
867 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
869
870 let sink_type = param
871 .properties
872 .get(CONNECTOR_TYPE_KEY)
873 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
874
875 let sink_type = sink_type.to_lowercase();
876 match_sink_name_str!(
877 sink_type.as_str(),
878 SinkType,
879 Ok(SinkType::try_from(param)?.into()),
880 |other| {
881 Err(SinkError::Config(anyhow!(
882 "unsupported sink connector {}",
883 other
884 )))
885 }
886 )
887 }
888
889 pub fn is_sink_into_table(&self) -> bool {
890 matches!(self, SinkImpl::Table(_))
891 }
892
893 pub fn is_blackhole(&self) -> bool {
894 matches!(self, SinkImpl::BlackHole(_))
895 }
896
897 pub fn is_coordinated_sink(&self) -> bool {
898 dispatch_sink!(self, sink, sink.is_coordinated_sink())
899 }
900}
901
902pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
903 SinkImpl::new(param)
904}
905
906macro_rules! def_sink_impl {
907 () => {
908 $crate::for_all_sinks! { def_sink_impl }
909 };
910 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
911 #[derive(Debug)]
912 pub enum SinkImpl {
913 $(
914 $variant_name(Box<$sink_type>),
915 )*
916 }
917
918 $(
919 impl From<$sink_type> for SinkImpl {
920 fn from(sink: $sink_type) -> SinkImpl {
921 SinkImpl::$variant_name(Box::new(sink))
922 }
923 }
924 )*
925 };
926}
927
928def_sink_impl!();
929
930pub type Result<T> = std::result::Result<T, SinkError>;
931
932#[derive(Error, Debug)]
933pub enum SinkError {
934 #[error("Kafka error: {0}")]
935 Kafka(#[from] rdkafka::error::KafkaError),
936 #[error("Kinesis error: {0}")]
937 Kinesis(
938 #[source]
939 #[backtrace]
940 anyhow::Error,
941 ),
942 #[error("Remote sink error: {0}")]
943 Remote(
944 #[source]
945 #[backtrace]
946 anyhow::Error,
947 ),
948 #[error("Encode error: {0}")]
949 Encode(String),
950 #[error("Avro error: {0}")]
951 Avro(#[from] apache_avro::Error),
952 #[error("Iceberg error: {0}")]
953 Iceberg(
954 #[source]
955 #[backtrace]
956 anyhow::Error,
957 ),
958 #[error("config error: {0}")]
959 Config(
960 #[source]
961 #[backtrace]
962 anyhow::Error,
963 ),
964 #[error("coordinator error: {0}")]
965 Coordinator(
966 #[source]
967 #[backtrace]
968 anyhow::Error,
969 ),
970 #[error("ClickHouse error: {0}")]
971 ClickHouse(String),
972 #[error("Redis error: {0}")]
973 Redis(String),
974 #[error("Mqtt error: {0}")]
975 Mqtt(
976 #[source]
977 #[backtrace]
978 anyhow::Error,
979 ),
980 #[error("Nats error: {0}")]
981 Nats(
982 #[source]
983 #[backtrace]
984 anyhow::Error,
985 ),
986 #[error("Google Pub/Sub error: {0}")]
987 GooglePubSub(
988 #[source]
989 #[backtrace]
990 anyhow::Error,
991 ),
992 #[error("Doris/Starrocks connect error: {0}")]
993 DorisStarrocksConnect(
994 #[source]
995 #[backtrace]
996 anyhow::Error,
997 ),
998 #[error("Doris error: {0}")]
999 Doris(String),
1000 #[error("DeltaLake error: {0}")]
1001 DeltaLake(
1002 #[source]
1003 #[backtrace]
1004 anyhow::Error,
1005 ),
1006 #[error("ElasticSearch/OpenSearch error: {0}")]
1007 ElasticSearchOpenSearch(
1008 #[source]
1009 #[backtrace]
1010 anyhow::Error,
1011 ),
1012 #[error("Starrocks error: {0}")]
1013 Starrocks(String),
1014 #[error("File error: {0}")]
1015 File(String),
1016 #[error("Pulsar error: {0}")]
1017 Pulsar(
1018 #[source]
1019 #[backtrace]
1020 anyhow::Error,
1021 ),
1022 #[error(transparent)]
1023 Internal(
1024 #[from]
1025 #[backtrace]
1026 anyhow::Error,
1027 ),
1028 #[error("BigQuery error: {0}")]
1029 BigQuery(
1030 #[source]
1031 #[backtrace]
1032 anyhow::Error,
1033 ),
1034 #[error("DynamoDB error: {0}")]
1035 DynamoDb(
1036 #[source]
1037 #[backtrace]
1038 anyhow::Error,
1039 ),
1040 #[error("SQL Server error: {0}")]
1041 SqlServer(
1042 #[source]
1043 #[backtrace]
1044 anyhow::Error,
1045 ),
1046 #[error("Postgres error: {0}")]
1047 Postgres(
1048 #[source]
1049 #[backtrace]
1050 anyhow::Error,
1051 ),
1052 #[error(transparent)]
1053 Connector(
1054 #[from]
1055 #[backtrace]
1056 ConnectorError,
1057 ),
1058 #[error("Secret error: {0}")]
1059 Secret(
1060 #[from]
1061 #[backtrace]
1062 SecretError,
1063 ),
1064 #[error("Mongodb error: {0}")]
1065 Mongodb(
1066 #[source]
1067 #[backtrace]
1068 anyhow::Error,
1069 ),
1070}
1071
1072impl From<sea_orm::DbErr> for SinkError {
1073 fn from(err: sea_orm::DbErr) -> Self {
1074 SinkError::Iceberg(anyhow!(err))
1075 }
1076}
1077
1078impl From<OpendalError> for SinkError {
1079 fn from(error: OpendalError) -> Self {
1080 SinkError::File(error.to_report_string())
1081 }
1082}
1083
1084impl From<parquet::errors::ParquetError> for SinkError {
1085 fn from(error: parquet::errors::ParquetError) -> Self {
1086 SinkError::File(error.to_report_string())
1087 }
1088}
1089
1090impl From<ArrayError> for SinkError {
1091 fn from(error: ArrayError) -> Self {
1092 SinkError::File(error.to_report_string())
1093 }
1094}
1095
1096impl From<RpcError> for SinkError {
1097 fn from(value: RpcError) -> Self {
1098 SinkError::Remote(anyhow!(value))
1099 }
1100}
1101
1102impl From<ClickHouseError> for SinkError {
1103 fn from(value: ClickHouseError) -> Self {
1104 SinkError::ClickHouse(value.to_report_string())
1105 }
1106}
1107
1108#[cfg(feature = "sink-deltalake")]
1109impl From<::deltalake::DeltaTableError> for SinkError {
1110 fn from(value: ::deltalake::DeltaTableError) -> Self {
1111 SinkError::DeltaLake(anyhow!(value))
1112 }
1113}
1114
1115impl From<RedisError> for SinkError {
1116 fn from(value: RedisError) -> Self {
1117 SinkError::Redis(value.to_report_string())
1118 }
1119}
1120
1121impl From<tiberius::error::Error> for SinkError {
1122 fn from(err: tiberius::error::Error) -> Self {
1123 SinkError::SqlServer(anyhow!(err))
1124 }
1125}
1126
1127impl From<::elasticsearch::Error> for SinkError {
1128 fn from(err: ::elasticsearch::Error) -> Self {
1129 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1130 }
1131}
1132
1133impl From<::opensearch::Error> for SinkError {
1134 fn from(err: ::opensearch::Error) -> Self {
1135 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1136 }
1137}
1138
1139impl From<tokio_postgres::Error> for SinkError {
1140 fn from(err: tokio_postgres::Error) -> Self {
1141 SinkError::Postgres(anyhow!(err))
1142 }
1143}