1feature_gated_sink_mod!(big_query, "bigquery");
16pub mod boxed;
17pub mod catalog;
18feature_gated_sink_mod!(clickhouse, ClickHouse, "clickhouse");
19pub mod coordinate;
20pub mod decouple_checkpoint_log_sink;
21feature_gated_sink_mod!(deltalake, DeltaLake, "deltalake");
22feature_gated_sink_mod!(doris, "doris");
23#[cfg(any(feature = "sink-doris", feature = "sink-starrocks"))]
24pub mod doris_starrocks_connector;
25pub mod dynamodb;
26pub mod elasticsearch_opensearch;
27pub mod encoder;
28pub mod file_sink;
29pub mod formatter;
30feature_gated_sink_mod!(google_pubsub, GooglePubSub, "google_pubsub");
31pub mod http;
32pub mod iceberg;
33pub mod kafka;
34pub mod kinesis;
35use risingwave_common::bail;
36use risingwave_pb::stream_plan::PbSinkSchemaChange;
37pub mod jdbc_jni_client;
38pub mod log_store;
39pub mod mock_coordination_client;
40pub mod mongodb;
41pub mod mqtt;
42pub mod nats;
43pub mod postgres;
44pub mod pulsar;
45pub mod redis;
46pub mod remote;
47pub mod snowflake_redshift;
48pub mod sqlserver;
49feature_gated_sink_mod!(starrocks, "starrocks");
50pub mod test_sink;
51pub mod trivial;
52pub mod turbopuffer;
53pub mod utils;
54pub mod writer;
55pub mod prelude {
56 pub use crate::sink::{
57 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
58 SINK_USER_FORCE_COMPACTION, Sink, SinkError, SinkParam, SinkWriterParam,
59 };
60}
61
62use std::collections::BTreeMap;
63use std::future::Future;
64use std::sync::{Arc, LazyLock};
65
66use ::redis::RedisError;
67use anyhow::anyhow;
68use async_trait::async_trait;
69use chrono_tz::{Tz, UTC};
70use decouple_checkpoint_log_sink::{
71 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
72 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
73};
74use futures::future::BoxFuture;
75use opendal::Error as OpendalError;
76use prometheus::Registry;
77use risingwave_common::array::ArrayError;
78use risingwave_common::bitmap::Bitmap;
79use risingwave_common::catalog::{ColumnDesc, Field, Schema};
80use risingwave_common::config::StreamingConfig;
81use risingwave_common::hash::ActorId;
82use risingwave_common::metrics::{
83 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
84 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
85};
86use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
87use risingwave_common::secret::{LocalSecretManager, SecretError};
88use risingwave_common::session_config::sink_decouple::SinkDecouple;
89use risingwave_common::{
90 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
91 register_guarded_int_gauge_vec_with_registry,
92};
93use risingwave_pb::catalog::PbSinkType;
94use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
95use risingwave_pb::id::ExecutorId;
96use risingwave_rpc_client::MetaClient;
97use risingwave_rpc_client::error::RpcError;
98use starrocks::STARROCKS_SINK;
99use thiserror::Error;
100use thiserror_ext::AsReport;
101use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
102pub use tracing;
103
104use self::catalog::{SinkFormatDesc, SinkType};
105use self::clickhouse::CLICKHOUSE_SINK;
106use self::deltalake::DELTALAKE_SINK;
107use self::iceberg::ICEBERG_SINK;
108use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
109use crate::WithPropertiesExt;
110use crate::connector_common::IcebergSinkCompactionUpdate;
111use crate::error::{ConnectorError, ConnectorResult};
112use crate::sink::boxed::{BoxSinglePhaseCoordinator, BoxTwoPhaseCoordinator};
113use crate::sink::catalog::desc::SinkDesc;
114use crate::sink::catalog::{SinkCatalog, SinkId};
115use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
116use crate::sink::file_sink::fs::FsSink;
117use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
118use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
119use crate::sink::utils::feature_gated_sink_mod;
120
121const BOUNDED_CHANNEL_SIZE: usize = 16;
122#[macro_export]
123macro_rules! for_all_sinks {
124 ($macro:path $(, $arg:tt)*) => {
125 $macro! {
126 {
127 { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
128 { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
129 { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
130 { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
131 { Http, $crate::sink::http::HttpSink, $crate::sink::http::HttpConfig },
132 { Turbopuffer, $crate::sink::turbopuffer::TurbopufferSink, $crate::sink::turbopuffer::TurbopufferConfig },
133 { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
134 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
135 { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
136 { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
137 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
138 { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
139 { Jdbc, $crate::sink::remote::JdbcSink, () },
140 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
141 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
142 { Cassandra, $crate::sink::remote::CassandraSink, () },
143 { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
144 { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
145 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
146
147 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
148 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
149 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
150
151 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
152 { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
153 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
154 { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
155 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
156 { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
157 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
158 { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
159 { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
160 { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
161
162 { Test, $crate::sink::test_sink::TestSink, () },
163 { Table, $crate::sink::trivial::TableSink, () }
164 }
165 $(,$arg)*
166 }
167 };
168}
169
170#[macro_export]
171macro_rules! generate_config_use_clauses {
172 ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
173 $(
174 $crate::generate_config_use_single! { $($config_type)+ }
175 )*
176 };
177}
178
179#[macro_export]
180macro_rules! generate_config_use_single {
181 (()) => {};
183
184 ($config_type:path) => {
186 #[allow(unused_imports)]
187 pub(super) use $config_type;
188 };
189}
190
191#[macro_export]
193macro_rules! use_all_sink_configs {
194 () => {
195 $crate::for_all_sinks! { $crate::generate_config_use_clauses }
196 };
197}
198
199#[macro_export]
200macro_rules! dispatch_sink {
201 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
202 use $crate::sink::SinkImpl;
203
204 match $impl {
205 $(
206 SinkImpl::$variant_name($sink) => $body,
207 )*
208 }
209 }};
210 ($impl:expr, $sink:ident, $body:expr) => {{
211 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
212 }};
213}
214
215#[macro_export]
216macro_rules! match_sink_name_str {
217 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
218 use $crate::sink::Sink;
219 match $name_str {
220 $(
221 <$sink_type>::SINK_NAME => {
222 type $type_name = $sink_type;
223 {
224 $body
225 }
226 },
227 )*
228 other => ($on_other_closure)(other),
229 }
230 }};
231 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
232 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
233 }};
234}
235
236pub const CONNECTOR_TYPE_KEY: &str = "connector";
237pub const SINK_TYPE_OPTION: &str = "type";
238pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
240pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
241pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
242pub const SINK_TYPE_UPSERT: &str = "upsert";
243pub const SINK_TYPE_RETRACT: &str = "retract";
244pub const SINK_USER_IGNORE_DELETE_OPTION: &str = "ignore_delete";
246pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
248pub const SINK_USER_FORCE_COMPACTION: &str = "force_compaction";
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct SinkParam {
252 pub sink_id: SinkId,
253 pub sink_name: String,
254 pub properties: BTreeMap<String, String>,
255 pub columns: Vec<ColumnDesc>,
256 pub downstream_pk: Option<Vec<usize>>,
258 pub sink_type: SinkType,
259 pub ignore_delete: bool,
261 pub format_desc: Option<SinkFormatDesc>,
262 pub db_name: String,
263
264 pub sink_from_name: String,
270}
271
272impl SinkParam {
273 pub fn from_proto(pb_param: PbSinkParam) -> Self {
274 let ignore_delete = pb_param.ignore_delete();
275 let table_schema = pb_param.table_schema.expect("should contain table schema");
276 let format_desc = match pb_param.format_desc {
277 Some(f) => f.try_into().ok(),
278 None => {
279 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
280 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
281 match (connector, r#type) {
282 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
283 _ => None,
284 }
285 }
286 };
287 Self {
288 sink_id: SinkId::from(pb_param.sink_id),
289 sink_name: pb_param.sink_name,
290 properties: pb_param.properties,
291 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
292 downstream_pk: if table_schema.pk_indices.is_empty() {
293 None
294 } else {
295 Some(
296 (table_schema.pk_indices.iter())
297 .map(|i| *i as usize)
298 .collect(),
299 )
300 },
301 sink_type: SinkType::from_proto(
302 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
303 ),
304 ignore_delete,
305 format_desc,
306 db_name: pb_param.db_name,
307 sink_from_name: pb_param.sink_from_name,
308 }
309 }
310
311 pub fn to_proto(&self) -> PbSinkParam {
312 PbSinkParam {
313 sink_id: self.sink_id,
314 sink_name: self.sink_name.clone(),
315 properties: self.properties.clone(),
316 table_schema: Some(TableSchema {
317 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
318 pk_indices: (self.downstream_pk.as_ref())
319 .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
320 }),
321 sink_type: self.sink_type.to_proto().into(),
322 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
323 db_name: self.db_name.clone(),
324 sink_from_name: self.sink_from_name.clone(),
325 raw_ignore_delete: self.ignore_delete,
326 }
327 }
328
329 pub fn schema(&self) -> Schema {
330 Schema {
331 fields: self.columns.iter().map(Field::from).collect(),
332 }
333 }
334
335 pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
341 self.downstream_pk.clone().unwrap_or_default()
342 }
343
344 pub fn fill_secret_for_format_desc(
347 format_desc: Option<SinkFormatDesc>,
348 ) -> Result<Option<SinkFormatDesc>> {
349 match format_desc {
350 Some(mut format_desc) => {
351 format_desc.options = LocalSecretManager::global()
352 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
353 Ok(Some(format_desc))
354 }
355 None => Ok(None),
356 }
357 }
358
359 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
361 let columns = sink_catalog
362 .visible_columns()
363 .map(|col| col.column_desc.clone())
364 .collect();
365 let properties_with_secret = LocalSecretManager::global()
366 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
367 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
368 Ok(Self {
369 sink_id: sink_catalog.id,
370 sink_name: sink_catalog.name,
371 properties: properties_with_secret,
372 columns,
373 downstream_pk: sink_catalog.downstream_pk,
374 sink_type: sink_catalog.sink_type,
375 ignore_delete: sink_catalog.ignore_delete,
376 format_desc: format_desc_with_secret,
377 db_name: sink_catalog.db_name,
378 sink_from_name: sink_catalog.sink_from_name,
379 })
380 }
381}
382
383pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
384 use crate::enforce_secret::EnforceSecret;
385
386 let connector = props
387 .get_connector()
388 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
389 let key_iter = props.key_iter();
390 match_sink_name_str!(
391 connector.as_str(),
392 PropType,
393 PropType::enforce_secret(key_iter),
394 |other| bail!("connector '{}' is not supported", other)
395 )
396}
397
398pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
399 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
400
401#[derive(Clone)]
402pub struct SinkMetrics {
403 pub sink_commit_duration: LabelGuardedHistogramVec,
404 pub connector_sink_rows_received: LabelGuardedIntCounterVec,
405
406 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
408 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
409 pub log_store_write_rows: LabelGuardedIntCounterVec,
410
411 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
413 pub log_store_read_rows: LabelGuardedIntCounterVec,
414 pub log_store_read_bytes: LabelGuardedIntCounterVec,
415 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
416
417 pub iceberg_write_qps: LabelGuardedIntCounterVec,
419 pub iceberg_write_latency: LabelGuardedHistogramVec,
420 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
421 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
422 pub iceberg_partition_num: LabelGuardedIntGaugeVec,
423 pub iceberg_write_bytes: LabelGuardedIntCounterVec,
424 pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
425}
426
427impl SinkMetrics {
428 pub fn new(registry: &Registry) -> Self {
429 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
430 "sink_commit_duration",
431 "Duration of commit op in sink",
432 &["actor_id", "connector", "sink_id", "sink_name"],
433 registry
434 )
435 .unwrap();
436
437 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
438 "connector_sink_rows_received",
439 "Number of rows received by sink",
440 &["actor_id", "connector_type", "sink_id", "sink_name"],
441 registry
442 )
443 .unwrap();
444
445 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
446 "log_store_first_write_epoch",
447 "The first write epoch of log store",
448 &["actor_id", "sink_id", "sink_name"],
449 registry
450 )
451 .unwrap();
452
453 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
454 "log_store_latest_write_epoch",
455 "The latest write epoch of log store",
456 &["actor_id", "sink_id", "sink_name"],
457 registry
458 )
459 .unwrap();
460
461 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
462 "log_store_write_rows",
463 "The write rate of rows",
464 &["actor_id", "sink_id", "sink_name"],
465 registry
466 )
467 .unwrap();
468
469 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
470 "log_store_latest_read_epoch",
471 "The latest read epoch of log store",
472 &["actor_id", "connector", "sink_id", "sink_name"],
473 registry
474 )
475 .unwrap();
476
477 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
478 "log_store_read_rows",
479 "The read rate of rows",
480 &["actor_id", "connector", "sink_id", "sink_name"],
481 registry
482 )
483 .unwrap();
484
485 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
486 "log_store_read_bytes",
487 "Total size of chunks read by log reader",
488 &["actor_id", "connector", "sink_id", "sink_name"],
489 registry
490 )
491 .unwrap();
492
493 let log_store_reader_wait_new_future_duration_ns =
494 register_guarded_int_counter_vec_with_registry!(
495 "log_store_reader_wait_new_future_duration_ns",
496 "Accumulated duration of LogReader to wait for next call to create future",
497 &["actor_id", "connector", "sink_id", "sink_name"],
498 registry
499 )
500 .unwrap();
501
502 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
503 "iceberg_write_qps",
504 "The qps of iceberg writer",
505 &["actor_id", "sink_id", "sink_name"],
506 registry
507 )
508 .unwrap();
509
510 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
511 "iceberg_write_latency",
512 "The latency of iceberg writer",
513 &["actor_id", "sink_id", "sink_name"],
514 registry
515 )
516 .unwrap();
517
518 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
519 "iceberg_rolling_unflushed_data_file",
520 "The unflushed data file count of iceberg rolling writer",
521 &["actor_id", "sink_id", "sink_name"],
522 registry
523 )
524 .unwrap();
525
526 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
527 "iceberg_position_delete_cache_num",
528 "The delete cache num of iceberg position delete writer",
529 &["actor_id", "sink_id", "sink_name"],
530 registry
531 )
532 .unwrap();
533
534 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
535 "iceberg_partition_num",
536 "The partition num of iceberg partition writer",
537 &["actor_id", "sink_id", "sink_name"],
538 registry
539 )
540 .unwrap();
541
542 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
543 "iceberg_write_bytes",
544 "The write bytes of iceberg writer",
545 &["actor_id", "sink_id", "sink_name"],
546 registry
547 )
548 .unwrap();
549
550 let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
551 "iceberg_snapshot_num",
552 "The snapshot number of iceberg table",
553 &["sink_name", "catalog_name", "table_name"],
554 registry
555 )
556 .unwrap();
557
558 Self {
559 sink_commit_duration,
560 connector_sink_rows_received,
561 log_store_first_write_epoch,
562 log_store_latest_write_epoch,
563 log_store_write_rows,
564 log_store_latest_read_epoch,
565 log_store_read_rows,
566 log_store_read_bytes,
567 log_store_reader_wait_new_future_duration_ns,
568 iceberg_write_qps,
569 iceberg_write_latency,
570 iceberg_rolling_unflushed_data_file,
571 iceberg_position_delete_cache_num,
572 iceberg_partition_num,
573 iceberg_write_bytes,
574 iceberg_snapshot_num,
575 }
576 }
577}
578
579#[derive(Clone)]
580pub struct SinkWriterParam {
581 pub executor_id: ExecutorId,
583 pub vnode_bitmap: Option<Bitmap>,
584 pub meta_client: Option<SinkMetaClient>,
585 pub extra_partition_col_idx: Option<usize>,
590
591 pub actor_id: ActorId,
592 pub sink_id: SinkId,
593 pub sink_name: String,
594 pub connector: String,
595 pub streaming_config: StreamingConfig,
596 pub time_zone: Tz,
597}
598
599#[derive(Clone)]
600pub struct SinkWriterMetrics {
601 pub sink_commit_duration: LabelGuardedHistogram,
602 pub connector_sink_rows_received: LabelGuardedIntCounter,
603}
604
605impl SinkWriterMetrics {
606 pub fn new(writer_param: &SinkWriterParam) -> Self {
607 let labels = [
608 &writer_param.actor_id.to_string(),
609 writer_param.connector.as_str(),
610 &writer_param.sink_id.to_string(),
611 writer_param.sink_name.as_str(),
612 ];
613 let sink_commit_duration = GLOBAL_SINK_METRICS
614 .sink_commit_duration
615 .with_guarded_label_values(&labels);
616 let connector_sink_rows_received = GLOBAL_SINK_METRICS
617 .connector_sink_rows_received
618 .with_guarded_label_values(&labels);
619 Self {
620 sink_commit_duration,
621 connector_sink_rows_received,
622 }
623 }
624
625 #[cfg(test)]
626 pub fn for_test() -> Self {
627 Self {
628 sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
629 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
630 }
631 }
632}
633
634#[derive(Clone)]
635pub enum SinkMetaClient {
636 MetaClient(MetaClient),
637 MockMetaClient(MockMetaClient),
638}
639
640impl SinkMetaClient {
641 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
642 match self {
643 SinkMetaClient::MetaClient(meta_client) => {
644 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
645 meta_client.sink_coordinate_client().await,
646 )
647 }
648 SinkMetaClient::MockMetaClient(mock_meta_client) => {
649 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
650 mock_meta_client.sink_coordinate_client(),
651 )
652 }
653 }
654 }
655
656 pub async fn add_sink_fail_evet_log(
657 &self,
658 sink_id: SinkId,
659 sink_name: String,
660 connector: String,
661 error: String,
662 ) {
663 match self {
664 SinkMetaClient::MetaClient(meta_client) => {
665 match meta_client
666 .add_sink_fail_evet(sink_id, sink_name, connector, error)
667 .await
668 {
669 Ok(_) => {}
670 Err(e) => {
671 tracing::warn!(error = %e.as_report(), %sink_id, "Failed to add sink fail event to event log.");
672 }
673 }
674 }
675 SinkMetaClient::MockMetaClient(_) => {}
676 }
677 }
678}
679
680impl SinkWriterParam {
681 pub fn for_test() -> Self {
682 SinkWriterParam {
683 executor_id: Default::default(),
684 vnode_bitmap: Default::default(),
685 meta_client: Default::default(),
686 extra_partition_col_idx: Default::default(),
687
688 actor_id: 1.into(),
689 sink_id: SinkId::new(1),
690 sink_name: "test_sink".to_owned(),
691 connector: "test_connector".to_owned(),
692 streaming_config: StreamingConfig::default(),
693 time_zone: UTC,
694 }
695 }
696}
697
698fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
699 matches!(
700 sink_name,
701 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
702 )
703}
704pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
705 const SINK_NAME: &'static str;
706
707 type LogSinker: LogSinker;
708
709 fn set_default_commit_checkpoint_interval(
710 desc: &mut SinkDesc,
711 user_specified: &SinkDecouple,
712 ) -> Result<()> {
713 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
714 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
715 Some(commit_checkpoint_interval) => {
716 let commit_checkpoint_interval = commit_checkpoint_interval
717 .parse::<u64>()
718 .map_err(|e| SinkError::Config(anyhow!(e)))?;
719 if matches!(user_specified, SinkDecouple::Disable)
720 && commit_checkpoint_interval > 1
721 {
722 return Err(SinkError::Config(anyhow!(
723 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
724 )));
725 }
726 }
727 None => match user_specified {
728 SinkDecouple::Default | SinkDecouple::Enable => {
729 if matches!(Self::SINK_NAME, ICEBERG_SINK) {
730 desc.properties.insert(
731 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
732 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
733 );
734 } else {
735 desc.properties.insert(
736 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
737 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
738 );
739 }
740 }
741 SinkDecouple::Disable => {
742 desc.properties.insert(
743 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
744 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
745 );
746 }
747 },
748 }
749 }
750 Ok(())
751 }
752
753 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
755 match user_specified {
756 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
757 SinkDecouple::Disable => Ok(false),
758 }
759 }
760
761 fn support_schema_change() -> bool {
762 false
763 }
764
765 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
766 Ok(())
767 }
768
769 async fn validate(&self) -> Result<()>;
770 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
771
772 fn is_coordinated_sink(&self) -> bool {
773 false
774 }
775
776 async fn new_coordinator(
777 &self,
778 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
779 ) -> Result<SinkCommitCoordinator> {
780 Err(SinkError::Coordinator(anyhow!("no coordinator")))
781 }
782}
783
784pub trait SinkLogReader: Send {
785 fn start_from(
786 &mut self,
787 start_offset: Option<u64>,
788 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
789 fn next_item(
793 &mut self,
794 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
795
796 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
799}
800
801impl<R: LogReader> SinkLogReader for &mut R {
802 fn next_item(
803 &mut self,
804 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
805 <R as LogReader>::next_item(*self)
806 }
807
808 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
809 <R as LogReader>::truncate(*self, offset)
810 }
811
812 fn start_from(
813 &mut self,
814 start_offset: Option<u64>,
815 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
816 <R as LogReader>::start_from(*self, start_offset)
817 }
818}
819
820#[async_trait]
821pub trait LogSinker: 'static + Send {
822 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
824}
825pub type SinkCommittedEpochSubscriber = Arc<
826 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
827 + Send
828 + Sync
829 + 'static,
830>;
831
832pub enum SinkCommitCoordinator {
833 SinglePhase(BoxSinglePhaseCoordinator),
834 TwoPhase(BoxTwoPhaseCoordinator),
835}
836
837#[async_trait]
838pub trait SinglePhaseCommitCoordinator {
839 async fn init(&mut self) -> Result<()>;
841
842 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
844
845 async fn commit_schema_change(
848 &mut self,
849 _epoch: u64,
850 _schema_change: PbSinkSchemaChange,
851 ) -> Result<()> {
852 Err(SinkError::Coordinator(anyhow!(
853 "Schema change is not implemented for single-phase commit coordinator {}",
854 std::any::type_name::<Self>()
855 )))
856 }
857}
858
859#[async_trait]
860pub trait TwoPhaseCommitCoordinator {
861 async fn init(&mut self) -> Result<()>;
863
864 async fn pre_commit(
866 &mut self,
867 epoch: u64,
868 metadata: Vec<SinkMetadata>,
869 schema_change: Option<PbSinkSchemaChange>,
870 ) -> Result<Option<Vec<u8>>>;
871
872 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()>;
874
875 async fn commit_schema_change(
878 &mut self,
879 _epoch: u64,
880 _schema_change: PbSinkSchemaChange,
881 ) -> Result<()> {
882 Err(SinkError::Coordinator(anyhow!(
883 "Schema change is not implemented for two-phase commit coordinator {}",
884 std::any::type_name::<Self>()
885 )))
886 }
887
888 async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>);
890}
891
892impl SinkImpl {
893 pub fn new(mut param: SinkParam) -> Result<Self> {
894 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
895
896 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
898
899 let sink_type = param
900 .properties
901 .get(CONNECTOR_TYPE_KEY)
902 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
903
904 let sink_type = sink_type.to_lowercase();
905 match_sink_name_str!(
906 sink_type.as_str(),
907 SinkType,
908 Ok(SinkType::try_from(param)?.into()),
909 |other| {
910 Err(SinkError::Config(anyhow!(
911 "unsupported sink connector {}",
912 other
913 )))
914 }
915 )
916 }
917
918 pub fn is_sink_into_table(&self) -> bool {
919 matches!(self, SinkImpl::Table(_))
920 }
921
922 pub fn is_blackhole(&self) -> bool {
923 matches!(self, SinkImpl::BlackHole(_))
924 }
925
926 pub fn is_coordinated_sink(&self) -> bool {
927 dispatch_sink!(self, sink, sink.is_coordinated_sink())
928 }
929}
930
931pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
932 SinkImpl::new(param)
933}
934
935macro_rules! def_sink_impl {
936 () => {
937 $crate::for_all_sinks! { def_sink_impl }
938 };
939 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
940 #[derive(Debug)]
941 pub enum SinkImpl {
942 $(
943 $variant_name(Box<$sink_type>),
944 )*
945 }
946
947 $(
948 impl From<$sink_type> for SinkImpl {
949 fn from(sink: $sink_type) -> SinkImpl {
950 SinkImpl::$variant_name(Box::new(sink))
951 }
952 }
953 )*
954 };
955}
956
957def_sink_impl!();
958
959pub type Result<T> = std::result::Result<T, SinkError>;
960
961#[derive(Error, Debug)]
962pub enum SinkError {
963 #[error("Kafka error: {0}")]
964 Kafka(#[from] rdkafka::error::KafkaError),
965 #[error("Kinesis error: {0}")]
966 Kinesis(
967 #[source]
968 #[backtrace]
969 anyhow::Error,
970 ),
971 #[error("Remote sink error: {0}")]
972 Remote(
973 #[source]
974 #[backtrace]
975 anyhow::Error,
976 ),
977 #[error("Encode error: {0}")]
978 Encode(String),
979 #[error("Avro error: {0}")]
980 Avro(#[from] apache_avro::Error),
981 #[error("Iceberg error: {0}")]
982 Iceberg(
983 #[source]
984 #[backtrace]
985 anyhow::Error,
986 ),
987 #[error("config error: {0}")]
988 Config(
989 #[source]
990 #[backtrace]
991 anyhow::Error,
992 ),
993 #[error("coordinator error: {0}")]
994 Coordinator(
995 #[source]
996 #[backtrace]
997 anyhow::Error,
998 ),
999 #[error("ClickHouse error: {0}")]
1000 ClickHouse(String),
1001 #[error("Redis error: {0}")]
1002 Redis(String),
1003 #[error("Http error: {0}")]
1004 Http(
1005 #[source]
1006 #[backtrace]
1007 anyhow::Error,
1008 ),
1009 #[error("Mqtt error: {0}")]
1010 Mqtt(
1011 #[source]
1012 #[backtrace]
1013 anyhow::Error,
1014 ),
1015 #[error("Nats error: {0}")]
1016 Nats(
1017 #[source]
1018 #[backtrace]
1019 anyhow::Error,
1020 ),
1021 #[error("Google Pub/Sub error: {0}")]
1022 GooglePubSub(
1023 #[source]
1024 #[backtrace]
1025 anyhow::Error,
1026 ),
1027 #[error("Doris/Starrocks connect error: {0}")]
1028 DorisStarrocksConnect(
1029 #[source]
1030 #[backtrace]
1031 anyhow::Error,
1032 ),
1033 #[error("Doris error: {0}")]
1034 Doris(String),
1035 #[error("DeltaLake error: {0}")]
1036 DeltaLake(
1037 #[source]
1038 #[backtrace]
1039 anyhow::Error,
1040 ),
1041 #[error("ElasticSearch/OpenSearch error: {0}")]
1042 ElasticSearchOpenSearch(
1043 #[source]
1044 #[backtrace]
1045 anyhow::Error,
1046 ),
1047 #[error("Starrocks error: {0}")]
1048 Starrocks(String),
1049 #[error("File error: {0}")]
1050 File(String),
1051 #[error("Pulsar error: {0}")]
1052 Pulsar(
1053 #[source]
1054 #[backtrace]
1055 anyhow::Error,
1056 ),
1057 #[error(transparent)]
1058 Internal(
1059 #[from]
1060 #[backtrace]
1061 anyhow::Error,
1062 ),
1063 #[error("BigQuery error: {0}")]
1064 BigQuery(
1065 #[source]
1066 #[backtrace]
1067 anyhow::Error,
1068 ),
1069 #[error("DynamoDB error: {0}")]
1070 DynamoDb(
1071 #[source]
1072 #[backtrace]
1073 anyhow::Error,
1074 ),
1075 #[error("SQL Server error: {0}")]
1076 SqlServer(
1077 #[source]
1078 #[backtrace]
1079 anyhow::Error,
1080 ),
1081 #[error("Postgres error: {0}")]
1082 Postgres(
1083 #[source]
1084 #[backtrace]
1085 anyhow::Error,
1086 ),
1087 #[error(transparent)]
1088 Connector(
1089 #[from]
1090 #[backtrace]
1091 ConnectorError,
1092 ),
1093 #[error("Secret error: {0}")]
1094 Secret(
1095 #[from]
1096 #[backtrace]
1097 SecretError,
1098 ),
1099 #[error("Mongodb error: {0}")]
1100 Mongodb(
1101 #[source]
1102 #[backtrace]
1103 anyhow::Error,
1104 ),
1105 #[error("Redshift error: {0}")]
1106 Redshift(
1107 #[source]
1108 #[backtrace]
1109 anyhow::Error,
1110 ),
1111}
1112
1113#[allow(clippy::disallowed_types)]
1114impl From<::iceberg::Error> for SinkError {
1115 fn from(err: ::iceberg::Error) -> Self {
1116 SinkError::Iceberg(anyhow!(err))
1117 }
1118}
1119
1120impl From<sea_orm::DbErr> for SinkError {
1121 fn from(err: sea_orm::DbErr) -> Self {
1122 SinkError::Iceberg(anyhow!(err))
1123 }
1124}
1125
1126impl From<OpendalError> for SinkError {
1127 fn from(error: OpendalError) -> Self {
1128 SinkError::File(error.to_report_string())
1129 }
1130}
1131
1132impl From<parquet::errors::ParquetError> for SinkError {
1133 fn from(error: parquet::errors::ParquetError) -> Self {
1134 SinkError::File(error.to_report_string())
1135 }
1136}
1137
1138impl From<ArrayError> for SinkError {
1139 fn from(error: ArrayError) -> Self {
1140 SinkError::File(error.to_report_string())
1141 }
1142}
1143
1144impl From<RpcError> for SinkError {
1145 fn from(value: RpcError) -> Self {
1146 SinkError::Remote(anyhow!(value))
1147 }
1148}
1149
1150impl From<RedisError> for SinkError {
1151 fn from(value: RedisError) -> Self {
1152 SinkError::Redis(value.to_report_string())
1153 }
1154}
1155
1156impl From<tiberius::error::Error> for SinkError {
1157 fn from(err: tiberius::error::Error) -> Self {
1158 SinkError::SqlServer(anyhow!(err))
1159 }
1160}
1161
1162impl From<::elasticsearch::Error> for SinkError {
1163 fn from(err: ::elasticsearch::Error) -> Self {
1164 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1165 }
1166}
1167
1168impl From<::opensearch::Error> for SinkError {
1169 fn from(err: ::opensearch::Error) -> Self {
1170 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1171 }
1172}
1173
1174impl From<tokio_postgres::Error> for SinkError {
1175 fn from(err: tokio_postgres::Error) -> Self {
1176 SinkError::Postgres(anyhow!(err))
1177 }
1178}