1pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32use risingwave_common::bail;
33pub mod log_store;
34pub mod mock_coordination_client;
35pub mod mongodb;
36pub mod mqtt;
37pub mod nats;
38pub mod postgres;
39pub mod pulsar;
40pub mod redis;
41pub mod remote;
42pub mod sqlserver;
43pub mod starrocks;
44pub mod test_sink;
45pub mod trivial;
46pub mod utils;
47pub mod writer;
48pub mod prelude {
49 pub use crate::sink::{
50 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkError,
51 SinkParam, SinkWriterParam,
52 };
53}
54
55use std::collections::BTreeMap;
56use std::future::Future;
57use std::sync::{Arc, LazyLock};
58
59use ::clickhouse::error::Error as ClickHouseError;
60use ::redis::RedisError;
61use anyhow::anyhow;
62use async_trait::async_trait;
63use clickhouse::CLICKHOUSE_SINK;
64use decouple_checkpoint_log_sink::{
65 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
66 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
67};
68use deltalake::DELTALAKE_SINK;
69use futures::future::BoxFuture;
70use iceberg::ICEBERG_SINK;
71use opendal::Error as OpendalError;
72use prometheus::Registry;
73use risingwave_common::array::ArrayError;
74use risingwave_common::bitmap::Bitmap;
75use risingwave_common::catalog::{ColumnDesc, Field, Schema};
76use risingwave_common::config::StreamingConfig;
77use risingwave_common::hash::ActorId;
78use risingwave_common::metrics::{
79 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
80 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
81};
82use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
83use risingwave_common::secret::{LocalSecretManager, SecretError};
84use risingwave_common::session_config::sink_decouple::SinkDecouple;
85use risingwave_common::{
86 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
87 register_guarded_int_gauge_vec_with_registry,
88};
89use risingwave_pb::catalog::PbSinkType;
90use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
91use risingwave_rpc_client::MetaClient;
92use risingwave_rpc_client::error::RpcError;
93use sea_orm::DatabaseConnection;
94use starrocks::STARROCKS_SINK;
95use thiserror::Error;
96use thiserror_ext::AsReport;
97use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
98pub use tracing;
99
100use self::catalog::{SinkFormatDesc, SinkType};
101use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
102use crate::WithPropertiesExt;
103use crate::connector_common::IcebergSinkCompactionUpdate;
104use crate::error::{ConnectorError, ConnectorResult};
105use crate::sink::catalog::desc::SinkDesc;
106use crate::sink::catalog::{SinkCatalog, SinkId};
107use crate::sink::file_sink::fs::FsSink;
108use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
109use crate::sink::writer::SinkWriter;
110
111const BOUNDED_CHANNEL_SIZE: usize = 16;
112#[macro_export]
113macro_rules! for_all_sinks {
114 ($macro:path $(, $arg:tt)*) => {
115 $macro! {
116 {
117 { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
118 { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
119 { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
120 { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
121 { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
122 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
123 { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
124 { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
125 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
126 { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
127 { Jdbc, $crate::sink::remote::JdbcSink, () },
128 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
129 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
130 { Cassandra, $crate::sink::remote::CassandraSink, () },
131 { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
132 { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
133 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
134
135 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
136 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
137 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
138
139 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
140 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
141 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
142 { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
143 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
144 { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
145 { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
146 { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
147
148 { Test, $crate::sink::test_sink::TestSink, () },
149 { Table, $crate::sink::trivial::TableSink, () }
150 }
151 $(,$arg)*
152 }
153 };
154}
155
156#[macro_export]
157macro_rules! generate_config_use_clauses {
158 ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
159 $(
160 $crate::generate_config_use_single! { $($config_type)+ }
161 )*
162 };
163}
164
165#[macro_export]
166macro_rules! generate_config_use_single {
167 (()) => {};
169
170 ($config_type:path) => {
172 #[allow(unused_imports)]
173 pub(super) use $config_type;
174 };
175}
176
177#[macro_export]
179macro_rules! use_all_sink_configs {
180 () => {
181 $crate::for_all_sinks! { $crate::generate_config_use_clauses }
182 };
183}
184
185#[macro_export]
186macro_rules! dispatch_sink {
187 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
188 use $crate::sink::SinkImpl;
189
190 match $impl {
191 $(
192 SinkImpl::$variant_name($sink) => $body,
193 )*
194 }
195 }};
196 ($impl:expr, $sink:ident, $body:expr) => {{
197 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
198 }};
199}
200
201#[macro_export]
202macro_rules! match_sink_name_str {
203 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
204 use $crate::sink::Sink;
205 match $name_str {
206 $(
207 <$sink_type>::SINK_NAME => {
208 type $type_name = $sink_type;
209 {
210 $body
211 }
212 },
213 )*
214 other => ($on_other_closure)(other),
215 }
216 }};
217 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
218 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
219 }};
220}
221
222pub const CONNECTOR_TYPE_KEY: &str = "connector";
223pub const SINK_TYPE_OPTION: &str = "type";
224pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
226pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
227pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
228pub const SINK_TYPE_UPSERT: &str = "upsert";
229pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
230
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct SinkParam {
233 pub sink_id: SinkId,
234 pub sink_name: String,
235 pub properties: BTreeMap<String, String>,
236 pub columns: Vec<ColumnDesc>,
237 pub downstream_pk: Vec<usize>,
238 pub sink_type: SinkType,
239 pub format_desc: Option<SinkFormatDesc>,
240 pub db_name: String,
241
242 pub sink_from_name: String,
248}
249
250impl SinkParam {
251 pub fn from_proto(pb_param: PbSinkParam) -> Self {
252 let table_schema = pb_param.table_schema.expect("should contain table schema");
253 let format_desc = match pb_param.format_desc {
254 Some(f) => f.try_into().ok(),
255 None => {
256 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
257 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
258 match (connector, r#type) {
259 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
260 _ => None,
261 }
262 }
263 };
264 Self {
265 sink_id: SinkId::from(pb_param.sink_id),
266 sink_name: pb_param.sink_name,
267 properties: pb_param.properties,
268 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
269 downstream_pk: table_schema
270 .pk_indices
271 .iter()
272 .map(|i| *i as usize)
273 .collect(),
274 sink_type: SinkType::from_proto(
275 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
276 ),
277 format_desc,
278 db_name: pb_param.db_name,
279 sink_from_name: pb_param.sink_from_name,
280 }
281 }
282
283 pub fn to_proto(&self) -> PbSinkParam {
284 PbSinkParam {
285 sink_id: self.sink_id.sink_id,
286 sink_name: self.sink_name.clone(),
287 properties: self.properties.clone(),
288 table_schema: Some(TableSchema {
289 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
290 pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
291 }),
292 sink_type: self.sink_type.to_proto().into(),
293 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
294 db_name: self.db_name.clone(),
295 sink_from_name: self.sink_from_name.clone(),
296 }
297 }
298
299 pub fn schema(&self) -> Schema {
300 Schema {
301 fields: self.columns.iter().map(Field::from).collect(),
302 }
303 }
304
305 pub fn fill_secret_for_format_desc(
308 format_desc: Option<SinkFormatDesc>,
309 ) -> Result<Option<SinkFormatDesc>> {
310 match format_desc {
311 Some(mut format_desc) => {
312 format_desc.options = LocalSecretManager::global()
313 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
314 Ok(Some(format_desc))
315 }
316 None => Ok(None),
317 }
318 }
319
320 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
322 let columns = sink_catalog
323 .visible_columns()
324 .map(|col| col.column_desc.clone())
325 .collect();
326 let properties_with_secret = LocalSecretManager::global()
327 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
328 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
329 Ok(Self {
330 sink_id: sink_catalog.id,
331 sink_name: sink_catalog.name,
332 properties: properties_with_secret,
333 columns,
334 downstream_pk: sink_catalog.downstream_pk,
335 sink_type: sink_catalog.sink_type,
336 format_desc: format_desc_with_secret,
337 db_name: sink_catalog.db_name,
338 sink_from_name: sink_catalog.sink_from_name,
339 })
340 }
341}
342
343pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
344 use crate::enforce_secret::EnforceSecret;
345
346 let connector = props
347 .get_connector()
348 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
349 let key_iter = props.key_iter();
350 match_sink_name_str!(
351 connector.as_str(),
352 PropType,
353 PropType::enforce_secret(key_iter),
354 |other| bail!("connector '{}' is not supported", other)
355 )
356}
357
358pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
359 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
360
361#[derive(Clone)]
362pub struct SinkMetrics {
363 pub sink_commit_duration: LabelGuardedHistogramVec,
364 pub connector_sink_rows_received: LabelGuardedIntCounterVec,
365
366 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
368 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
369 pub log_store_write_rows: LabelGuardedIntCounterVec,
370
371 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
373 pub log_store_read_rows: LabelGuardedIntCounterVec,
374 pub log_store_read_bytes: LabelGuardedIntCounterVec,
375 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
376
377 pub iceberg_write_qps: LabelGuardedIntCounterVec,
379 pub iceberg_write_latency: LabelGuardedHistogramVec,
380 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
381 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
382 pub iceberg_partition_num: LabelGuardedIntGaugeVec,
383 pub iceberg_write_bytes: LabelGuardedIntCounterVec,
384}
385
386impl SinkMetrics {
387 pub fn new(registry: &Registry) -> Self {
388 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
389 "sink_commit_duration",
390 "Duration of commit op in sink",
391 &["actor_id", "connector", "sink_id", "sink_name"],
392 registry
393 )
394 .unwrap();
395
396 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
397 "connector_sink_rows_received",
398 "Number of rows received by sink",
399 &["actor_id", "connector_type", "sink_id", "sink_name"],
400 registry
401 )
402 .unwrap();
403
404 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
405 "log_store_first_write_epoch",
406 "The first write epoch of log store",
407 &["actor_id", "sink_id", "sink_name"],
408 registry
409 )
410 .unwrap();
411
412 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
413 "log_store_latest_write_epoch",
414 "The latest write epoch of log store",
415 &["actor_id", "sink_id", "sink_name"],
416 registry
417 )
418 .unwrap();
419
420 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
421 "log_store_write_rows",
422 "The write rate of rows",
423 &["actor_id", "sink_id", "sink_name"],
424 registry
425 )
426 .unwrap();
427
428 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
429 "log_store_latest_read_epoch",
430 "The latest read epoch of log store",
431 &["actor_id", "connector", "sink_id", "sink_name"],
432 registry
433 )
434 .unwrap();
435
436 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
437 "log_store_read_rows",
438 "The read rate of rows",
439 &["actor_id", "connector", "sink_id", "sink_name"],
440 registry
441 )
442 .unwrap();
443
444 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
445 "log_store_read_bytes",
446 "Total size of chunks read by log reader",
447 &["actor_id", "connector", "sink_id", "sink_name"],
448 registry
449 )
450 .unwrap();
451
452 let log_store_reader_wait_new_future_duration_ns =
453 register_guarded_int_counter_vec_with_registry!(
454 "log_store_reader_wait_new_future_duration_ns",
455 "Accumulated duration of LogReader to wait for next call to create future",
456 &["actor_id", "connector", "sink_id", "sink_name"],
457 registry
458 )
459 .unwrap();
460
461 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
462 "iceberg_write_qps",
463 "The qps of iceberg writer",
464 &["actor_id", "sink_id", "sink_name"],
465 registry
466 )
467 .unwrap();
468
469 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
470 "iceberg_write_latency",
471 "The latency of iceberg writer",
472 &["actor_id", "sink_id", "sink_name"],
473 registry
474 )
475 .unwrap();
476
477 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
478 "iceberg_rolling_unflushed_data_file",
479 "The unflushed data file count of iceberg rolling writer",
480 &["actor_id", "sink_id", "sink_name"],
481 registry
482 )
483 .unwrap();
484
485 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
486 "iceberg_position_delete_cache_num",
487 "The delete cache num of iceberg position delete writer",
488 &["actor_id", "sink_id", "sink_name"],
489 registry
490 )
491 .unwrap();
492
493 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
494 "iceberg_partition_num",
495 "The partition num of iceberg partition writer",
496 &["actor_id", "sink_id", "sink_name"],
497 registry
498 )
499 .unwrap();
500
501 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
502 "iceberg_write_bytes",
503 "The write bytes of iceberg writer",
504 &["actor_id", "sink_id", "sink_name"],
505 registry
506 )
507 .unwrap();
508
509 Self {
510 sink_commit_duration,
511 connector_sink_rows_received,
512 log_store_first_write_epoch,
513 log_store_latest_write_epoch,
514 log_store_write_rows,
515 log_store_latest_read_epoch,
516 log_store_read_rows,
517 log_store_read_bytes,
518 log_store_reader_wait_new_future_duration_ns,
519 iceberg_write_qps,
520 iceberg_write_latency,
521 iceberg_rolling_unflushed_data_file,
522 iceberg_position_delete_cache_num,
523 iceberg_partition_num,
524 iceberg_write_bytes,
525 }
526 }
527}
528
529#[derive(Clone)]
530pub struct SinkWriterParam {
531 pub executor_id: u64,
533 pub vnode_bitmap: Option<Bitmap>,
534 pub meta_client: Option<SinkMetaClient>,
535 pub extra_partition_col_idx: Option<usize>,
540
541 pub actor_id: ActorId,
542 pub sink_id: SinkId,
543 pub sink_name: String,
544 pub connector: String,
545 pub streaming_config: StreamingConfig,
546}
547
548#[derive(Clone)]
549pub struct SinkWriterMetrics {
550 pub sink_commit_duration: LabelGuardedHistogram,
551 pub connector_sink_rows_received: LabelGuardedIntCounter,
552}
553
554impl SinkWriterMetrics {
555 pub fn new(writer_param: &SinkWriterParam) -> Self {
556 let labels = [
557 &writer_param.actor_id.to_string(),
558 writer_param.connector.as_str(),
559 &writer_param.sink_id.to_string(),
560 writer_param.sink_name.as_str(),
561 ];
562 let sink_commit_duration = GLOBAL_SINK_METRICS
563 .sink_commit_duration
564 .with_guarded_label_values(&labels);
565 let connector_sink_rows_received = GLOBAL_SINK_METRICS
566 .connector_sink_rows_received
567 .with_guarded_label_values(&labels);
568 Self {
569 sink_commit_duration,
570 connector_sink_rows_received,
571 }
572 }
573
574 #[cfg(test)]
575 pub fn for_test() -> Self {
576 Self {
577 sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
578 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
579 }
580 }
581}
582
583#[derive(Clone)]
584pub enum SinkMetaClient {
585 MetaClient(MetaClient),
586 MockMetaClient(MockMetaClient),
587}
588
589impl SinkMetaClient {
590 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
591 match self {
592 SinkMetaClient::MetaClient(meta_client) => {
593 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
594 meta_client.sink_coordinate_client().await,
595 )
596 }
597 SinkMetaClient::MockMetaClient(mock_meta_client) => {
598 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
599 mock_meta_client.sink_coordinate_client(),
600 )
601 }
602 }
603 }
604
605 pub async fn add_sink_fail_evet_log(
606 &self,
607 sink_id: u32,
608 sink_name: String,
609 connector: String,
610 error: String,
611 ) {
612 match self {
613 SinkMetaClient::MetaClient(meta_client) => {
614 match meta_client
615 .add_sink_fail_evet(sink_id, sink_name, connector, error)
616 .await
617 {
618 Ok(_) => {}
619 Err(e) => {
620 tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
621 }
622 }
623 }
624 SinkMetaClient::MockMetaClient(_) => {}
625 }
626 }
627}
628
629impl SinkWriterParam {
630 pub fn for_test() -> Self {
631 SinkWriterParam {
632 executor_id: Default::default(),
633 vnode_bitmap: Default::default(),
634 meta_client: Default::default(),
635 extra_partition_col_idx: Default::default(),
636
637 actor_id: 1,
638 sink_id: SinkId::new(1),
639 sink_name: "test_sink".to_owned(),
640 connector: "test_connector".to_owned(),
641 streaming_config: StreamingConfig::default(),
642 }
643 }
644}
645
646fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
647 matches!(
648 sink_name,
649 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK
650 )
651}
652pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
653 const SINK_NAME: &'static str;
654
655 type LogSinker: LogSinker;
656 #[expect(deprecated)]
657 type Coordinator: SinkCommitCoordinator = NoSinkCommitCoordinator;
658
659 fn set_default_commit_checkpoint_interval(
660 desc: &mut SinkDesc,
661 user_specified: &SinkDecouple,
662 ) -> Result<()> {
663 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
664 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
665 Some(commit_checkpoint_interval) => {
666 let commit_checkpoint_interval = commit_checkpoint_interval
667 .parse::<u64>()
668 .map_err(|e| SinkError::Config(anyhow!(e)))?;
669 if matches!(user_specified, SinkDecouple::Disable)
670 && commit_checkpoint_interval > 1
671 {
672 return Err(SinkError::Config(anyhow!(
673 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
674 )));
675 }
676 }
677 None => match user_specified {
678 SinkDecouple::Default | SinkDecouple::Enable => {
679 desc.properties.insert(
680 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
681 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
682 );
683 }
684 SinkDecouple::Disable => {
685 desc.properties.insert(
686 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
687 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
688 );
689 }
690 },
691 }
692 }
693 Ok(())
694 }
695
696 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
698 match user_specified {
699 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
700 SinkDecouple::Disable => Ok(false),
701 }
702 }
703
704 fn support_schema_change() -> bool {
705 false
706 }
707
708 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
709 Ok(())
710 }
711
712 async fn validate(&self) -> Result<()>;
713 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
714
715 fn is_coordinated_sink(&self) -> bool {
716 false
717 }
718
719 async fn new_coordinator(
720 &self,
721 _db: DatabaseConnection,
722 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
723 ) -> Result<Self::Coordinator> {
724 Err(SinkError::Coordinator(anyhow!("no coordinator")))
725 }
726}
727
728pub trait SinkLogReader: Send {
729 fn start_from(
730 &mut self,
731 start_offset: Option<u64>,
732 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
733 fn next_item(
737 &mut self,
738 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
739
740 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
743}
744
745impl<R: LogReader> SinkLogReader for &mut R {
746 fn next_item(
747 &mut self,
748 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
749 <R as LogReader>::next_item(*self)
750 }
751
752 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
753 <R as LogReader>::truncate(*self, offset)
754 }
755
756 fn start_from(
757 &mut self,
758 start_offset: Option<u64>,
759 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
760 <R as LogReader>::start_from(*self, start_offset)
761 }
762}
763
764#[async_trait]
765pub trait LogSinker: 'static + Send {
766 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
768}
769pub type SinkCommittedEpochSubscriber = Arc<
770 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
771 + Send
772 + Sync
773 + 'static,
774>;
775
776#[async_trait]
777pub trait SinkCommitCoordinator {
778 async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
780 async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
785}
786
787#[deprecated]
788pub struct NoSinkCommitCoordinator(!);
802
803#[expect(deprecated)]
804#[async_trait]
805impl SinkCommitCoordinator for NoSinkCommitCoordinator {
806 async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
807 unreachable!()
808 }
809
810 async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
811 unreachable!()
812 }
813}
814
815impl SinkImpl {
816 pub fn new(mut param: SinkParam) -> Result<Self> {
817 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
818
819 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
821
822 let sink_type = param
823 .properties
824 .get(CONNECTOR_TYPE_KEY)
825 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
826
827 let sink_type = sink_type.to_lowercase();
828 match_sink_name_str!(
829 sink_type.as_str(),
830 SinkType,
831 Ok(SinkType::try_from(param)?.into()),
832 |other| {
833 Err(SinkError::Config(anyhow!(
834 "unsupported sink connector {}",
835 other
836 )))
837 }
838 )
839 }
840
841 pub fn is_sink_into_table(&self) -> bool {
842 matches!(self, SinkImpl::Table(_))
843 }
844
845 pub fn is_blackhole(&self) -> bool {
846 matches!(self, SinkImpl::BlackHole(_))
847 }
848
849 pub fn is_coordinated_sink(&self) -> bool {
850 dispatch_sink!(self, sink, sink.is_coordinated_sink())
851 }
852}
853
854pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
855 SinkImpl::new(param)
856}
857
858macro_rules! def_sink_impl {
859 () => {
860 $crate::for_all_sinks! { def_sink_impl }
861 };
862 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
863 #[derive(Debug)]
864 pub enum SinkImpl {
865 $(
866 $variant_name(Box<$sink_type>),
867 )*
868 }
869
870 $(
871 impl From<$sink_type> for SinkImpl {
872 fn from(sink: $sink_type) -> SinkImpl {
873 SinkImpl::$variant_name(Box::new(sink))
874 }
875 }
876 )*
877 };
878}
879
880def_sink_impl!();
881
882pub type Result<T> = std::result::Result<T, SinkError>;
883
884#[derive(Error, Debug)]
885pub enum SinkError {
886 #[error("Kafka error: {0}")]
887 Kafka(#[from] rdkafka::error::KafkaError),
888 #[error("Kinesis error: {0}")]
889 Kinesis(
890 #[source]
891 #[backtrace]
892 anyhow::Error,
893 ),
894 #[error("Remote sink error: {0}")]
895 Remote(
896 #[source]
897 #[backtrace]
898 anyhow::Error,
899 ),
900 #[error("Encode error: {0}")]
901 Encode(String),
902 #[error("Avro error: {0}")]
903 Avro(#[from] apache_avro::Error),
904 #[error("Iceberg error: {0}")]
905 Iceberg(
906 #[source]
907 #[backtrace]
908 anyhow::Error,
909 ),
910 #[error("config error: {0}")]
911 Config(
912 #[source]
913 #[backtrace]
914 anyhow::Error,
915 ),
916 #[error("coordinator error: {0}")]
917 Coordinator(
918 #[source]
919 #[backtrace]
920 anyhow::Error,
921 ),
922 #[error("ClickHouse error: {0}")]
923 ClickHouse(String),
924 #[error("Redis error: {0}")]
925 Redis(String),
926 #[error("Mqtt error: {0}")]
927 Mqtt(
928 #[source]
929 #[backtrace]
930 anyhow::Error,
931 ),
932 #[error("Nats error: {0}")]
933 Nats(
934 #[source]
935 #[backtrace]
936 anyhow::Error,
937 ),
938 #[error("Google Pub/Sub error: {0}")]
939 GooglePubSub(
940 #[source]
941 #[backtrace]
942 anyhow::Error,
943 ),
944 #[error("Doris/Starrocks connect error: {0}")]
945 DorisStarrocksConnect(
946 #[source]
947 #[backtrace]
948 anyhow::Error,
949 ),
950 #[error("Doris error: {0}")]
951 Doris(String),
952 #[error("DeltaLake error: {0}")]
953 DeltaLake(
954 #[source]
955 #[backtrace]
956 anyhow::Error,
957 ),
958 #[error("ElasticSearch/OpenSearch error: {0}")]
959 ElasticSearchOpenSearch(
960 #[source]
961 #[backtrace]
962 anyhow::Error,
963 ),
964 #[error("Starrocks error: {0}")]
965 Starrocks(String),
966 #[error("File error: {0}")]
967 File(String),
968 #[error("Pulsar error: {0}")]
969 Pulsar(
970 #[source]
971 #[backtrace]
972 anyhow::Error,
973 ),
974 #[error(transparent)]
975 Internal(
976 #[from]
977 #[backtrace]
978 anyhow::Error,
979 ),
980 #[error("BigQuery error: {0}")]
981 BigQuery(
982 #[source]
983 #[backtrace]
984 anyhow::Error,
985 ),
986 #[error("DynamoDB error: {0}")]
987 DynamoDb(
988 #[source]
989 #[backtrace]
990 anyhow::Error,
991 ),
992 #[error("SQL Server error: {0}")]
993 SqlServer(
994 #[source]
995 #[backtrace]
996 anyhow::Error,
997 ),
998 #[error("Postgres error: {0}")]
999 Postgres(
1000 #[source]
1001 #[backtrace]
1002 anyhow::Error,
1003 ),
1004 #[error(transparent)]
1005 Connector(
1006 #[from]
1007 #[backtrace]
1008 ConnectorError,
1009 ),
1010 #[error("Secret error: {0}")]
1011 Secret(
1012 #[from]
1013 #[backtrace]
1014 SecretError,
1015 ),
1016 #[error("Mongodb error: {0}")]
1017 Mongodb(
1018 #[source]
1019 #[backtrace]
1020 anyhow::Error,
1021 ),
1022}
1023
1024impl From<sea_orm::DbErr> for SinkError {
1025 fn from(err: sea_orm::DbErr) -> Self {
1026 SinkError::Iceberg(anyhow!(err))
1027 }
1028}
1029
1030impl From<OpendalError> for SinkError {
1031 fn from(error: OpendalError) -> Self {
1032 SinkError::File(error.to_report_string())
1033 }
1034}
1035
1036impl From<parquet::errors::ParquetError> for SinkError {
1037 fn from(error: parquet::errors::ParquetError) -> Self {
1038 SinkError::File(error.to_report_string())
1039 }
1040}
1041
1042impl From<ArrayError> for SinkError {
1043 fn from(error: ArrayError) -> Self {
1044 SinkError::File(error.to_report_string())
1045 }
1046}
1047
1048impl From<RpcError> for SinkError {
1049 fn from(value: RpcError) -> Self {
1050 SinkError::Remote(anyhow!(value))
1051 }
1052}
1053
1054impl From<ClickHouseError> for SinkError {
1055 fn from(value: ClickHouseError) -> Self {
1056 SinkError::ClickHouse(value.to_report_string())
1057 }
1058}
1059
1060#[cfg(feature = "sink-deltalake")]
1061impl From<::deltalake::DeltaTableError> for SinkError {
1062 fn from(value: ::deltalake::DeltaTableError) -> Self {
1063 SinkError::DeltaLake(anyhow!(value))
1064 }
1065}
1066
1067impl From<RedisError> for SinkError {
1068 fn from(value: RedisError) -> Self {
1069 SinkError::Redis(value.to_report_string())
1070 }
1071}
1072
1073impl From<tiberius::error::Error> for SinkError {
1074 fn from(err: tiberius::error::Error) -> Self {
1075 SinkError::SqlServer(anyhow!(err))
1076 }
1077}
1078
1079impl From<::elasticsearch::Error> for SinkError {
1080 fn from(err: ::elasticsearch::Error) -> Self {
1081 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1082 }
1083}
1084
1085impl From<::opensearch::Error> for SinkError {
1086 fn from(err: ::opensearch::Error) -> Self {
1087 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1088 }
1089}
1090
1091impl From<tokio_postgres::Error> for SinkError {
1092 fn from(err: tokio_postgres::Error) -> Self {
1093 SinkError::Postgres(anyhow!(err))
1094 }
1095}