1feature_gated_sink_mod!(big_query, "bigquery");
16pub mod boxed;
17pub mod catalog;
18feature_gated_sink_mod!(clickhouse, ClickHouse, "clickhouse");
19pub mod coordinate;
20pub mod decouple_checkpoint_log_sink;
21feature_gated_sink_mod!(deltalake, DeltaLake, "deltalake");
22feature_gated_sink_mod!(doris, "doris");
23#[cfg(any(feature = "sink-doris", feature = "sink-starrocks"))]
24pub mod doris_starrocks_connector;
25pub mod dynamodb;
26pub mod elasticsearch_opensearch;
27pub mod encoder;
28pub mod file_sink;
29pub mod formatter;
30feature_gated_sink_mod!(google_pubsub, GooglePubSub, "google_pubsub");
31pub mod http;
32pub mod iceberg;
33pub mod kafka;
34pub mod kinesis;
35use risingwave_common::bail;
36use risingwave_pb::stream_plan::PbSinkSchemaChange;
37pub mod jdbc_jni_client;
38pub mod log_store;
39pub mod mock_coordination_client;
40pub mod mongodb;
41pub mod mqtt;
42pub mod nats;
43pub mod postgres;
44pub mod pulsar;
45pub mod redis;
46pub mod remote;
47pub mod snowflake_redshift;
48pub mod sqlserver;
49feature_gated_sink_mod!(starrocks, "starrocks");
50pub mod test_sink;
51pub mod trivial;
52pub mod utils;
53pub mod writer;
54pub mod prelude {
55 pub use crate::sink::{
56 Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
57 SINK_USER_FORCE_COMPACTION, Sink, SinkError, SinkParam, SinkWriterParam,
58 };
59}
60
61use std::collections::BTreeMap;
62use std::future::Future;
63use std::sync::{Arc, LazyLock};
64
65use ::redis::RedisError;
66use anyhow::anyhow;
67use async_trait::async_trait;
68use decouple_checkpoint_log_sink::{
69 COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
70 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
71};
72use futures::future::BoxFuture;
73use opendal::Error as OpendalError;
74use prometheus::Registry;
75use risingwave_common::array::ArrayError;
76use risingwave_common::bitmap::Bitmap;
77use risingwave_common::catalog::{ColumnDesc, Field, Schema};
78use risingwave_common::config::StreamingConfig;
79use risingwave_common::hash::ActorId;
80use risingwave_common::metrics::{
81 LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
82 LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
83};
84use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
85use risingwave_common::secret::{LocalSecretManager, SecretError};
86use risingwave_common::session_config::sink_decouple::SinkDecouple;
87use risingwave_common::{
88 register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
89 register_guarded_int_gauge_vec_with_registry,
90};
91use risingwave_pb::catalog::PbSinkType;
92use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
93use risingwave_pb::id::ExecutorId;
94use risingwave_rpc_client::MetaClient;
95use risingwave_rpc_client::error::RpcError;
96use starrocks::STARROCKS_SINK;
97use thiserror::Error;
98use thiserror_ext::AsReport;
99use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
100pub use tracing;
101
102use self::catalog::{SinkFormatDesc, SinkType};
103use self::clickhouse::CLICKHOUSE_SINK;
104use self::deltalake::DELTALAKE_SINK;
105use self::iceberg::ICEBERG_SINK;
106use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
107use crate::WithPropertiesExt;
108use crate::connector_common::IcebergSinkCompactionUpdate;
109use crate::error::{ConnectorError, ConnectorResult};
110use crate::sink::boxed::{BoxSinglePhaseCoordinator, BoxTwoPhaseCoordinator};
111use crate::sink::catalog::desc::SinkDesc;
112use crate::sink::catalog::{SinkCatalog, SinkId};
113use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
114use crate::sink::file_sink::fs::FsSink;
115use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
116use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
117use crate::sink::utils::feature_gated_sink_mod;
118
119const BOUNDED_CHANNEL_SIZE: usize = 16;
120#[macro_export]
121macro_rules! for_all_sinks {
122 ($macro:path $(, $arg:tt)*) => {
123 $macro! {
124 {
125 { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
126 { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
127 { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
128 { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
129 { Http, $crate::sink::http::HttpSink, $crate::sink::http::HttpConfig },
130 { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
131 { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
132 { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
133 { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
134 { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
135 { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
136 { Jdbc, $crate::sink::remote::JdbcSink, () },
137 { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
138 { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
139 { Cassandra, $crate::sink::remote::CassandraSink, () },
140 { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
141 { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
142 { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
143
144 { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
145 { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
146 { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
147
148 { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
149 { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
150 { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
151 { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
152 { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
153 { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
154 { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
155 { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
156 { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
157 { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
158
159 { Test, $crate::sink::test_sink::TestSink, () },
160 { Table, $crate::sink::trivial::TableSink, () }
161 }
162 $(,$arg)*
163 }
164 };
165}
166
167#[macro_export]
168macro_rules! generate_config_use_clauses {
169 ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
170 $(
171 $crate::generate_config_use_single! { $($config_type)+ }
172 )*
173 };
174}
175
176#[macro_export]
177macro_rules! generate_config_use_single {
178 (()) => {};
180
181 ($config_type:path) => {
183 #[allow(unused_imports)]
184 pub(super) use $config_type;
185 };
186}
187
188#[macro_export]
190macro_rules! use_all_sink_configs {
191 () => {
192 $crate::for_all_sinks! { $crate::generate_config_use_clauses }
193 };
194}
195
196#[macro_export]
197macro_rules! dispatch_sink {
198 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
199 use $crate::sink::SinkImpl;
200
201 match $impl {
202 $(
203 SinkImpl::$variant_name($sink) => $body,
204 )*
205 }
206 }};
207 ($impl:expr, $sink:ident, $body:expr) => {{
208 $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
209 }};
210}
211
212#[macro_export]
213macro_rules! match_sink_name_str {
214 ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
215 use $crate::sink::Sink;
216 match $name_str {
217 $(
218 <$sink_type>::SINK_NAME => {
219 type $type_name = $sink_type;
220 {
221 $body
222 }
223 },
224 )*
225 other => ($on_other_closure)(other),
226 }
227 }};
228 ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
229 $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
230 }};
231}
232
233pub const CONNECTOR_TYPE_KEY: &str = "connector";
234pub const SINK_TYPE_OPTION: &str = "type";
235pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
237pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
238pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
239pub const SINK_TYPE_UPSERT: &str = "upsert";
240pub const SINK_TYPE_RETRACT: &str = "retract";
241pub const SINK_USER_IGNORE_DELETE_OPTION: &str = "ignore_delete";
243pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
245pub const SINK_USER_FORCE_COMPACTION: &str = "force_compaction";
246
247#[derive(Debug, Clone, PartialEq, Eq)]
248pub struct SinkParam {
249 pub sink_id: SinkId,
250 pub sink_name: String,
251 pub properties: BTreeMap<String, String>,
252 pub columns: Vec<ColumnDesc>,
253 pub downstream_pk: Option<Vec<usize>>,
255 pub sink_type: SinkType,
256 pub ignore_delete: bool,
258 pub format_desc: Option<SinkFormatDesc>,
259 pub db_name: String,
260
261 pub sink_from_name: String,
267}
268
269impl SinkParam {
270 pub fn from_proto(pb_param: PbSinkParam) -> Self {
271 let ignore_delete = pb_param.ignore_delete();
272 let table_schema = pb_param.table_schema.expect("should contain table schema");
273 let format_desc = match pb_param.format_desc {
274 Some(f) => f.try_into().ok(),
275 None => {
276 let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
277 let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
278 match (connector, r#type) {
279 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
280 _ => None,
281 }
282 }
283 };
284 Self {
285 sink_id: SinkId::from(pb_param.sink_id),
286 sink_name: pb_param.sink_name,
287 properties: pb_param.properties,
288 columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
289 downstream_pk: if table_schema.pk_indices.is_empty() {
290 None
291 } else {
292 Some(
293 (table_schema.pk_indices.iter())
294 .map(|i| *i as usize)
295 .collect(),
296 )
297 },
298 sink_type: SinkType::from_proto(
299 PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
300 ),
301 ignore_delete,
302 format_desc,
303 db_name: pb_param.db_name,
304 sink_from_name: pb_param.sink_from_name,
305 }
306 }
307
308 pub fn to_proto(&self) -> PbSinkParam {
309 PbSinkParam {
310 sink_id: self.sink_id,
311 sink_name: self.sink_name.clone(),
312 properties: self.properties.clone(),
313 table_schema: Some(TableSchema {
314 columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
315 pk_indices: (self.downstream_pk.as_ref())
316 .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
317 }),
318 sink_type: self.sink_type.to_proto().into(),
319 format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
320 db_name: self.db_name.clone(),
321 sink_from_name: self.sink_from_name.clone(),
322 raw_ignore_delete: self.ignore_delete,
323 }
324 }
325
326 pub fn schema(&self) -> Schema {
327 Schema {
328 fields: self.columns.iter().map(Field::from).collect(),
329 }
330 }
331
332 pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
338 self.downstream_pk.clone().unwrap_or_default()
339 }
340
341 pub fn fill_secret_for_format_desc(
344 format_desc: Option<SinkFormatDesc>,
345 ) -> Result<Option<SinkFormatDesc>> {
346 match format_desc {
347 Some(mut format_desc) => {
348 format_desc.options = LocalSecretManager::global()
349 .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
350 Ok(Some(format_desc))
351 }
352 None => Ok(None),
353 }
354 }
355
356 pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
358 let columns = sink_catalog
359 .visible_columns()
360 .map(|col| col.column_desc.clone())
361 .collect();
362 let properties_with_secret = LocalSecretManager::global()
363 .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
364 let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
365 Ok(Self {
366 sink_id: sink_catalog.id,
367 sink_name: sink_catalog.name,
368 properties: properties_with_secret,
369 columns,
370 downstream_pk: sink_catalog.downstream_pk,
371 sink_type: sink_catalog.sink_type,
372 ignore_delete: sink_catalog.ignore_delete,
373 format_desc: format_desc_with_secret,
374 db_name: sink_catalog.db_name,
375 sink_from_name: sink_catalog.sink_from_name,
376 })
377 }
378}
379
380pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
381 use crate::enforce_secret::EnforceSecret;
382
383 let connector = props
384 .get_connector()
385 .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
386 let key_iter = props.key_iter();
387 match_sink_name_str!(
388 connector.as_str(),
389 PropType,
390 PropType::enforce_secret(key_iter),
391 |other| bail!("connector '{}' is not supported", other)
392 )
393}
394
395pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
396 LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
397
398#[derive(Clone)]
399pub struct SinkMetrics {
400 pub sink_commit_duration: LabelGuardedHistogramVec,
401 pub connector_sink_rows_received: LabelGuardedIntCounterVec,
402
403 pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
405 pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
406 pub log_store_write_rows: LabelGuardedIntCounterVec,
407
408 pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
410 pub log_store_read_rows: LabelGuardedIntCounterVec,
411 pub log_store_read_bytes: LabelGuardedIntCounterVec,
412 pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
413
414 pub iceberg_write_qps: LabelGuardedIntCounterVec,
416 pub iceberg_write_latency: LabelGuardedHistogramVec,
417 pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
418 pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
419 pub iceberg_partition_num: LabelGuardedIntGaugeVec,
420 pub iceberg_write_bytes: LabelGuardedIntCounterVec,
421 pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
422}
423
424impl SinkMetrics {
425 pub fn new(registry: &Registry) -> Self {
426 let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
427 "sink_commit_duration",
428 "Duration of commit op in sink",
429 &["actor_id", "connector", "sink_id", "sink_name"],
430 registry
431 )
432 .unwrap();
433
434 let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
435 "connector_sink_rows_received",
436 "Number of rows received by sink",
437 &["actor_id", "connector_type", "sink_id", "sink_name"],
438 registry
439 )
440 .unwrap();
441
442 let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
443 "log_store_first_write_epoch",
444 "The first write epoch of log store",
445 &["actor_id", "sink_id", "sink_name"],
446 registry
447 )
448 .unwrap();
449
450 let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
451 "log_store_latest_write_epoch",
452 "The latest write epoch of log store",
453 &["actor_id", "sink_id", "sink_name"],
454 registry
455 )
456 .unwrap();
457
458 let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
459 "log_store_write_rows",
460 "The write rate of rows",
461 &["actor_id", "sink_id", "sink_name"],
462 registry
463 )
464 .unwrap();
465
466 let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
467 "log_store_latest_read_epoch",
468 "The latest read epoch of log store",
469 &["actor_id", "connector", "sink_id", "sink_name"],
470 registry
471 )
472 .unwrap();
473
474 let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
475 "log_store_read_rows",
476 "The read rate of rows",
477 &["actor_id", "connector", "sink_id", "sink_name"],
478 registry
479 )
480 .unwrap();
481
482 let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
483 "log_store_read_bytes",
484 "Total size of chunks read by log reader",
485 &["actor_id", "connector", "sink_id", "sink_name"],
486 registry
487 )
488 .unwrap();
489
490 let log_store_reader_wait_new_future_duration_ns =
491 register_guarded_int_counter_vec_with_registry!(
492 "log_store_reader_wait_new_future_duration_ns",
493 "Accumulated duration of LogReader to wait for next call to create future",
494 &["actor_id", "connector", "sink_id", "sink_name"],
495 registry
496 )
497 .unwrap();
498
499 let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
500 "iceberg_write_qps",
501 "The qps of iceberg writer",
502 &["actor_id", "sink_id", "sink_name"],
503 registry
504 )
505 .unwrap();
506
507 let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
508 "iceberg_write_latency",
509 "The latency of iceberg writer",
510 &["actor_id", "sink_id", "sink_name"],
511 registry
512 )
513 .unwrap();
514
515 let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
516 "iceberg_rolling_unflushed_data_file",
517 "The unflushed data file count of iceberg rolling writer",
518 &["actor_id", "sink_id", "sink_name"],
519 registry
520 )
521 .unwrap();
522
523 let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
524 "iceberg_position_delete_cache_num",
525 "The delete cache num of iceberg position delete writer",
526 &["actor_id", "sink_id", "sink_name"],
527 registry
528 )
529 .unwrap();
530
531 let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
532 "iceberg_partition_num",
533 "The partition num of iceberg partition writer",
534 &["actor_id", "sink_id", "sink_name"],
535 registry
536 )
537 .unwrap();
538
539 let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
540 "iceberg_write_bytes",
541 "The write bytes of iceberg writer",
542 &["actor_id", "sink_id", "sink_name"],
543 registry
544 )
545 .unwrap();
546
547 let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
548 "iceberg_snapshot_num",
549 "The snapshot number of iceberg table",
550 &["sink_name", "catalog_name", "table_name"],
551 registry
552 )
553 .unwrap();
554
555 Self {
556 sink_commit_duration,
557 connector_sink_rows_received,
558 log_store_first_write_epoch,
559 log_store_latest_write_epoch,
560 log_store_write_rows,
561 log_store_latest_read_epoch,
562 log_store_read_rows,
563 log_store_read_bytes,
564 log_store_reader_wait_new_future_duration_ns,
565 iceberg_write_qps,
566 iceberg_write_latency,
567 iceberg_rolling_unflushed_data_file,
568 iceberg_position_delete_cache_num,
569 iceberg_partition_num,
570 iceberg_write_bytes,
571 iceberg_snapshot_num,
572 }
573 }
574}
575
576#[derive(Clone)]
577pub struct SinkWriterParam {
578 pub executor_id: ExecutorId,
580 pub vnode_bitmap: Option<Bitmap>,
581 pub meta_client: Option<SinkMetaClient>,
582 pub extra_partition_col_idx: Option<usize>,
587
588 pub actor_id: ActorId,
589 pub sink_id: SinkId,
590 pub sink_name: String,
591 pub connector: String,
592 pub streaming_config: StreamingConfig,
593}
594
595#[derive(Clone)]
596pub struct SinkWriterMetrics {
597 pub sink_commit_duration: LabelGuardedHistogram,
598 pub connector_sink_rows_received: LabelGuardedIntCounter,
599}
600
601impl SinkWriterMetrics {
602 pub fn new(writer_param: &SinkWriterParam) -> Self {
603 let labels = [
604 &writer_param.actor_id.to_string(),
605 writer_param.connector.as_str(),
606 &writer_param.sink_id.to_string(),
607 writer_param.sink_name.as_str(),
608 ];
609 let sink_commit_duration = GLOBAL_SINK_METRICS
610 .sink_commit_duration
611 .with_guarded_label_values(&labels);
612 let connector_sink_rows_received = GLOBAL_SINK_METRICS
613 .connector_sink_rows_received
614 .with_guarded_label_values(&labels);
615 Self {
616 sink_commit_duration,
617 connector_sink_rows_received,
618 }
619 }
620
621 #[cfg(test)]
622 pub fn for_test() -> Self {
623 Self {
624 sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
625 connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
626 }
627 }
628}
629
630#[derive(Clone)]
631pub enum SinkMetaClient {
632 MetaClient(MetaClient),
633 MockMetaClient(MockMetaClient),
634}
635
636impl SinkMetaClient {
637 pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
638 match self {
639 SinkMetaClient::MetaClient(meta_client) => {
640 SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
641 meta_client.sink_coordinate_client().await,
642 )
643 }
644 SinkMetaClient::MockMetaClient(mock_meta_client) => {
645 SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
646 mock_meta_client.sink_coordinate_client(),
647 )
648 }
649 }
650 }
651
652 pub async fn add_sink_fail_evet_log(
653 &self,
654 sink_id: SinkId,
655 sink_name: String,
656 connector: String,
657 error: String,
658 ) {
659 match self {
660 SinkMetaClient::MetaClient(meta_client) => {
661 match meta_client
662 .add_sink_fail_evet(sink_id, sink_name, connector, error)
663 .await
664 {
665 Ok(_) => {}
666 Err(e) => {
667 tracing::warn!(error = %e.as_report(), %sink_id, "Failed to add sink fail event to event log.");
668 }
669 }
670 }
671 SinkMetaClient::MockMetaClient(_) => {}
672 }
673 }
674}
675
676impl SinkWriterParam {
677 pub fn for_test() -> Self {
678 SinkWriterParam {
679 executor_id: Default::default(),
680 vnode_bitmap: Default::default(),
681 meta_client: Default::default(),
682 extra_partition_col_idx: Default::default(),
683
684 actor_id: 1.into(),
685 sink_id: SinkId::new(1),
686 sink_name: "test_sink".to_owned(),
687 connector: "test_connector".to_owned(),
688 streaming_config: StreamingConfig::default(),
689 }
690 }
691}
692
693fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
694 matches!(
695 sink_name,
696 ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
697 )
698}
699pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
700 const SINK_NAME: &'static str;
701
702 type LogSinker: LogSinker;
703
704 fn set_default_commit_checkpoint_interval(
705 desc: &mut SinkDesc,
706 user_specified: &SinkDecouple,
707 ) -> Result<()> {
708 if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
709 match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
710 Some(commit_checkpoint_interval) => {
711 let commit_checkpoint_interval = commit_checkpoint_interval
712 .parse::<u64>()
713 .map_err(|e| SinkError::Config(anyhow!(e)))?;
714 if matches!(user_specified, SinkDecouple::Disable)
715 && commit_checkpoint_interval > 1
716 {
717 return Err(SinkError::Config(anyhow!(
718 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
719 )));
720 }
721 }
722 None => match user_specified {
723 SinkDecouple::Default | SinkDecouple::Enable => {
724 if matches!(Self::SINK_NAME, ICEBERG_SINK) {
725 desc.properties.insert(
726 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
727 ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
728 );
729 } else {
730 desc.properties.insert(
731 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
732 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
733 );
734 }
735 }
736 SinkDecouple::Disable => {
737 desc.properties.insert(
738 COMMIT_CHECKPOINT_INTERVAL.to_owned(),
739 DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
740 );
741 }
742 },
743 }
744 }
745 Ok(())
746 }
747
748 fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
750 match user_specified {
751 SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
752 SinkDecouple::Disable => Ok(false),
753 }
754 }
755
756 fn support_schema_change() -> bool {
757 false
758 }
759
760 fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
761 Ok(())
762 }
763
764 async fn validate(&self) -> Result<()>;
765 async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
766
767 fn is_coordinated_sink(&self) -> bool {
768 false
769 }
770
771 async fn new_coordinator(
772 &self,
773 _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
774 ) -> Result<SinkCommitCoordinator> {
775 Err(SinkError::Coordinator(anyhow!("no coordinator")))
776 }
777}
778
779pub trait SinkLogReader: Send {
780 fn start_from(
781 &mut self,
782 start_offset: Option<u64>,
783 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
784 fn next_item(
788 &mut self,
789 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
790
791 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
794}
795
796impl<R: LogReader> SinkLogReader for &mut R {
797 fn next_item(
798 &mut self,
799 ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
800 <R as LogReader>::next_item(*self)
801 }
802
803 fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
804 <R as LogReader>::truncate(*self, offset)
805 }
806
807 fn start_from(
808 &mut self,
809 start_offset: Option<u64>,
810 ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
811 <R as LogReader>::start_from(*self, start_offset)
812 }
813}
814
815#[async_trait]
816pub trait LogSinker: 'static + Send {
817 async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
819}
820pub type SinkCommittedEpochSubscriber = Arc<
821 dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
822 + Send
823 + Sync
824 + 'static,
825>;
826
827pub enum SinkCommitCoordinator {
828 SinglePhase(BoxSinglePhaseCoordinator),
829 TwoPhase(BoxTwoPhaseCoordinator),
830}
831
832#[async_trait]
833pub trait SinglePhaseCommitCoordinator {
834 async fn init(&mut self) -> Result<()>;
836
837 async fn commit_data(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
839
840 async fn commit_schema_change(
843 &mut self,
844 _epoch: u64,
845 _schema_change: PbSinkSchemaChange,
846 ) -> Result<()> {
847 Err(SinkError::Coordinator(anyhow!(
848 "Schema change is not implemented for single-phase commit coordinator {}",
849 std::any::type_name::<Self>()
850 )))
851 }
852}
853
854#[async_trait]
855pub trait TwoPhaseCommitCoordinator {
856 async fn init(&mut self) -> Result<()>;
858
859 async fn pre_commit(
861 &mut self,
862 epoch: u64,
863 metadata: Vec<SinkMetadata>,
864 schema_change: Option<PbSinkSchemaChange>,
865 ) -> Result<Option<Vec<u8>>>;
866
867 async fn commit_data(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()>;
869
870 async fn commit_schema_change(
873 &mut self,
874 _epoch: u64,
875 _schema_change: PbSinkSchemaChange,
876 ) -> Result<()> {
877 Err(SinkError::Coordinator(anyhow!(
878 "Schema change is not implemented for two-phase commit coordinator {}",
879 std::any::type_name::<Self>()
880 )))
881 }
882
883 async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>);
885}
886
887impl SinkImpl {
888 pub fn new(mut param: SinkParam) -> Result<Self> {
889 const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
890
891 param.properties.remove(PRIVATE_LINK_TARGET_KEY);
893
894 let sink_type = param
895 .properties
896 .get(CONNECTOR_TYPE_KEY)
897 .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
898
899 let sink_type = sink_type.to_lowercase();
900 match_sink_name_str!(
901 sink_type.as_str(),
902 SinkType,
903 Ok(SinkType::try_from(param)?.into()),
904 |other| {
905 Err(SinkError::Config(anyhow!(
906 "unsupported sink connector {}",
907 other
908 )))
909 }
910 )
911 }
912
913 pub fn is_sink_into_table(&self) -> bool {
914 matches!(self, SinkImpl::Table(_))
915 }
916
917 pub fn is_blackhole(&self) -> bool {
918 matches!(self, SinkImpl::BlackHole(_))
919 }
920
921 pub fn is_coordinated_sink(&self) -> bool {
922 dispatch_sink!(self, sink, sink.is_coordinated_sink())
923 }
924}
925
926pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
927 SinkImpl::new(param)
928}
929
930macro_rules! def_sink_impl {
931 () => {
932 $crate::for_all_sinks! { def_sink_impl }
933 };
934 ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
935 #[derive(Debug)]
936 pub enum SinkImpl {
937 $(
938 $variant_name(Box<$sink_type>),
939 )*
940 }
941
942 $(
943 impl From<$sink_type> for SinkImpl {
944 fn from(sink: $sink_type) -> SinkImpl {
945 SinkImpl::$variant_name(Box::new(sink))
946 }
947 }
948 )*
949 };
950}
951
952def_sink_impl!();
953
954pub type Result<T> = std::result::Result<T, SinkError>;
955
956#[derive(Error, Debug)]
957pub enum SinkError {
958 #[error("Kafka error: {0}")]
959 Kafka(#[from] rdkafka::error::KafkaError),
960 #[error("Kinesis error: {0}")]
961 Kinesis(
962 #[source]
963 #[backtrace]
964 anyhow::Error,
965 ),
966 #[error("Remote sink error: {0}")]
967 Remote(
968 #[source]
969 #[backtrace]
970 anyhow::Error,
971 ),
972 #[error("Encode error: {0}")]
973 Encode(String),
974 #[error("Avro error: {0}")]
975 Avro(#[from] apache_avro::Error),
976 #[error("Iceberg error: {0}")]
977 Iceberg(
978 #[source]
979 #[backtrace]
980 anyhow::Error,
981 ),
982 #[error("config error: {0}")]
983 Config(
984 #[source]
985 #[backtrace]
986 anyhow::Error,
987 ),
988 #[error("coordinator error: {0}")]
989 Coordinator(
990 #[source]
991 #[backtrace]
992 anyhow::Error,
993 ),
994 #[error("ClickHouse error: {0}")]
995 ClickHouse(String),
996 #[error("Redis error: {0}")]
997 Redis(String),
998 #[error("Http error: {0}")]
999 Http(
1000 #[source]
1001 #[backtrace]
1002 anyhow::Error,
1003 ),
1004 #[error("Mqtt error: {0}")]
1005 Mqtt(
1006 #[source]
1007 #[backtrace]
1008 anyhow::Error,
1009 ),
1010 #[error("Nats error: {0}")]
1011 Nats(
1012 #[source]
1013 #[backtrace]
1014 anyhow::Error,
1015 ),
1016 #[error("Google Pub/Sub error: {0}")]
1017 GooglePubSub(
1018 #[source]
1019 #[backtrace]
1020 anyhow::Error,
1021 ),
1022 #[error("Doris/Starrocks connect error: {0}")]
1023 DorisStarrocksConnect(
1024 #[source]
1025 #[backtrace]
1026 anyhow::Error,
1027 ),
1028 #[error("Doris error: {0}")]
1029 Doris(String),
1030 #[error("DeltaLake error: {0}")]
1031 DeltaLake(
1032 #[source]
1033 #[backtrace]
1034 anyhow::Error,
1035 ),
1036 #[error("ElasticSearch/OpenSearch error: {0}")]
1037 ElasticSearchOpenSearch(
1038 #[source]
1039 #[backtrace]
1040 anyhow::Error,
1041 ),
1042 #[error("Starrocks error: {0}")]
1043 Starrocks(String),
1044 #[error("File error: {0}")]
1045 File(String),
1046 #[error("Pulsar error: {0}")]
1047 Pulsar(
1048 #[source]
1049 #[backtrace]
1050 anyhow::Error,
1051 ),
1052 #[error(transparent)]
1053 Internal(
1054 #[from]
1055 #[backtrace]
1056 anyhow::Error,
1057 ),
1058 #[error("BigQuery error: {0}")]
1059 BigQuery(
1060 #[source]
1061 #[backtrace]
1062 anyhow::Error,
1063 ),
1064 #[error("DynamoDB error: {0}")]
1065 DynamoDb(
1066 #[source]
1067 #[backtrace]
1068 anyhow::Error,
1069 ),
1070 #[error("SQL Server error: {0}")]
1071 SqlServer(
1072 #[source]
1073 #[backtrace]
1074 anyhow::Error,
1075 ),
1076 #[error("Postgres error: {0}")]
1077 Postgres(
1078 #[source]
1079 #[backtrace]
1080 anyhow::Error,
1081 ),
1082 #[error(transparent)]
1083 Connector(
1084 #[from]
1085 #[backtrace]
1086 ConnectorError,
1087 ),
1088 #[error("Secret error: {0}")]
1089 Secret(
1090 #[from]
1091 #[backtrace]
1092 SecretError,
1093 ),
1094 #[error("Mongodb error: {0}")]
1095 Mongodb(
1096 #[source]
1097 #[backtrace]
1098 anyhow::Error,
1099 ),
1100 #[error("Redshift error: {0}")]
1101 Redshift(
1102 #[source]
1103 #[backtrace]
1104 anyhow::Error,
1105 ),
1106}
1107
1108impl From<sea_orm::DbErr> for SinkError {
1109 fn from(err: sea_orm::DbErr) -> Self {
1110 SinkError::Iceberg(anyhow!(err))
1111 }
1112}
1113
1114impl From<OpendalError> for SinkError {
1115 fn from(error: OpendalError) -> Self {
1116 SinkError::File(error.to_report_string())
1117 }
1118}
1119
1120impl From<parquet::errors::ParquetError> for SinkError {
1121 fn from(error: parquet::errors::ParquetError) -> Self {
1122 SinkError::File(error.to_report_string())
1123 }
1124}
1125
1126impl From<ArrayError> for SinkError {
1127 fn from(error: ArrayError) -> Self {
1128 SinkError::File(error.to_report_string())
1129 }
1130}
1131
1132impl From<RpcError> for SinkError {
1133 fn from(value: RpcError) -> Self {
1134 SinkError::Remote(anyhow!(value))
1135 }
1136}
1137
1138impl From<RedisError> for SinkError {
1139 fn from(value: RedisError) -> Self {
1140 SinkError::Redis(value.to_report_string())
1141 }
1142}
1143
1144impl From<tiberius::error::Error> for SinkError {
1145 fn from(err: tiberius::error::Error) -> Self {
1146 SinkError::SqlServer(anyhow!(err))
1147 }
1148}
1149
1150impl From<::elasticsearch::Error> for SinkError {
1151 fn from(err: ::elasticsearch::Error) -> Self {
1152 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1153 }
1154}
1155
1156impl From<::opensearch::Error> for SinkError {
1157 fn from(err: ::opensearch::Error) -> Self {
1158 SinkError::ElasticSearchOpenSearch(anyhow!(err))
1159 }
1160}
1161
1162impl From<tokio_postgres::Error> for SinkError {
1163 fn from(err: tokio_postgres::Error) -> Self {
1164 SinkError::Postgres(anyhow!(err))
1165 }
1166}