risingwave_connector/sink/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32use risingwave_common::bail;
33pub mod log_store;
34pub mod mock_coordination_client;
35pub mod mongodb;
36pub mod mqtt;
37pub mod nats;
38pub mod postgres;
39pub mod pulsar;
40pub mod redis;
41pub mod remote;
42pub mod sqlserver;
43pub mod starrocks;
44pub mod test_sink;
45pub mod trivial;
46pub mod utils;
47pub mod writer;
48pub mod prelude {
49    pub use crate::sink::{
50        Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkError,
51        SinkParam, SinkWriterParam,
52    };
53}
54
55use std::collections::BTreeMap;
56use std::future::Future;
57use std::sync::{Arc, LazyLock};
58
59use ::clickhouse::error::Error as ClickHouseError;
60use ::redis::RedisError;
61use anyhow::anyhow;
62use async_trait::async_trait;
63use clickhouse::CLICKHOUSE_SINK;
64use decouple_checkpoint_log_sink::{
65    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
66    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
67};
68use deltalake::DELTALAKE_SINK;
69use futures::future::BoxFuture;
70use iceberg::ICEBERG_SINK;
71use opendal::Error as OpendalError;
72use prometheus::Registry;
73use risingwave_common::array::ArrayError;
74use risingwave_common::bitmap::Bitmap;
75use risingwave_common::catalog::{ColumnDesc, Field, Schema};
76use risingwave_common::config::StreamingConfig;
77use risingwave_common::hash::ActorId;
78use risingwave_common::metrics::{
79    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
80    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
81};
82use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
83use risingwave_common::secret::{LocalSecretManager, SecretError};
84use risingwave_common::session_config::sink_decouple::SinkDecouple;
85use risingwave_common::{
86    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
87    register_guarded_int_gauge_vec_with_registry,
88};
89use risingwave_pb::catalog::PbSinkType;
90use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
91use risingwave_rpc_client::MetaClient;
92use risingwave_rpc_client::error::RpcError;
93use sea_orm::DatabaseConnection;
94use starrocks::STARROCKS_SINK;
95use thiserror::Error;
96use thiserror_ext::AsReport;
97use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
98pub use tracing;
99
100use self::catalog::{SinkFormatDesc, SinkType};
101use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
102use crate::WithPropertiesExt;
103use crate::connector_common::IcebergSinkCompactionUpdate;
104use crate::error::{ConnectorError, ConnectorResult};
105use crate::sink::catalog::desc::SinkDesc;
106use crate::sink::catalog::{SinkCatalog, SinkId};
107use crate::sink::file_sink::fs::FsSink;
108use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
109use crate::sink::writer::SinkWriter;
110
111const BOUNDED_CHANNEL_SIZE: usize = 16;
112#[macro_export]
113macro_rules! for_all_sinks {
114    ($macro:path $(, $arg:tt)*) => {
115        $macro! {
116            {
117                { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
118                { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
119                { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
120                { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
121                { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
122                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
123                { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
124                { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
125                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
126                { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
127                { Jdbc, $crate::sink::remote::JdbcSink, () },
128                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
129                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
130                { Cassandra, $crate::sink::remote::CassandraSink, () },
131                { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
132                { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
133                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
134
135                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
136                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
137                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
138
139                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
140                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
141                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
142                { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
143                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
144                { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
145                { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
146                { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
147
148                { Test, $crate::sink::test_sink::TestSink, () },
149                { Table, $crate::sink::trivial::TableSink, () }
150            }
151            $(,$arg)*
152        }
153    };
154}
155
156#[macro_export]
157macro_rules! generate_config_use_clauses {
158    ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
159        $(
160            $crate::generate_config_use_single! { $($config_type)+ }
161        )*
162    };
163}
164
165#[macro_export]
166macro_rules! generate_config_use_single {
167    // Skip () config types
168    (()) => {};
169
170    // Generate use clause for actual config types
171    ($config_type:path) => {
172        #[allow(unused_imports)]
173        pub(super) use $config_type;
174    };
175}
176
177// Convenience macro that uses for_all_sinks
178#[macro_export]
179macro_rules! use_all_sink_configs {
180    () => {
181        $crate::for_all_sinks! { $crate::generate_config_use_clauses }
182    };
183}
184
185#[macro_export]
186macro_rules! dispatch_sink {
187    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
188        use $crate::sink::SinkImpl;
189
190        match $impl {
191            $(
192                SinkImpl::$variant_name($sink) => $body,
193            )*
194        }
195    }};
196    ($impl:expr, $sink:ident, $body:expr) => {{
197        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
198    }};
199}
200
201#[macro_export]
202macro_rules! match_sink_name_str {
203    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
204        use $crate::sink::Sink;
205        match $name_str {
206            $(
207                <$sink_type>::SINK_NAME => {
208                    type $type_name = $sink_type;
209                    {
210                        $body
211                    }
212                },
213            )*
214            other => ($on_other_closure)(other),
215        }
216    }};
217    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
218        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
219    }};
220}
221
222pub const CONNECTOR_TYPE_KEY: &str = "connector";
223pub const SINK_TYPE_OPTION: &str = "type";
224/// `snapshot = false` corresponds to [`risingwave_pb::stream_plan::StreamScanType::UpstreamOnly`]
225pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
226pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
227pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
228pub const SINK_TYPE_UPSERT: &str = "upsert";
229pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
230
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct SinkParam {
233    pub sink_id: SinkId,
234    pub sink_name: String,
235    pub properties: BTreeMap<String, String>,
236    pub columns: Vec<ColumnDesc>,
237    pub downstream_pk: Vec<usize>,
238    pub sink_type: SinkType,
239    pub format_desc: Option<SinkFormatDesc>,
240    pub db_name: String,
241
242    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
243    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
244    ///
245    /// See also `gen_sink_plan`.
246    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
247    pub sink_from_name: String,
248}
249
250impl SinkParam {
251    pub fn from_proto(pb_param: PbSinkParam) -> Self {
252        let table_schema = pb_param.table_schema.expect("should contain table schema");
253        let format_desc = match pb_param.format_desc {
254            Some(f) => f.try_into().ok(),
255            None => {
256                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
257                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
258                match (connector, r#type) {
259                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
260                    _ => None,
261                }
262            }
263        };
264        Self {
265            sink_id: SinkId::from(pb_param.sink_id),
266            sink_name: pb_param.sink_name,
267            properties: pb_param.properties,
268            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
269            downstream_pk: table_schema
270                .pk_indices
271                .iter()
272                .map(|i| *i as usize)
273                .collect(),
274            sink_type: SinkType::from_proto(
275                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
276            ),
277            format_desc,
278            db_name: pb_param.db_name,
279            sink_from_name: pb_param.sink_from_name,
280        }
281    }
282
283    pub fn to_proto(&self) -> PbSinkParam {
284        PbSinkParam {
285            sink_id: self.sink_id.sink_id,
286            sink_name: self.sink_name.clone(),
287            properties: self.properties.clone(),
288            table_schema: Some(TableSchema {
289                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
290                pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
291            }),
292            sink_type: self.sink_type.to_proto().into(),
293            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
294            db_name: self.db_name.clone(),
295            sink_from_name: self.sink_from_name.clone(),
296        }
297    }
298
299    pub fn schema(&self) -> Schema {
300        Schema {
301            fields: self.columns.iter().map(Field::from).collect(),
302        }
303    }
304
305    // `SinkParams` should only be used when there is a secret context.
306    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
307    pub fn fill_secret_for_format_desc(
308        format_desc: Option<SinkFormatDesc>,
309    ) -> Result<Option<SinkFormatDesc>> {
310        match format_desc {
311            Some(mut format_desc) => {
312                format_desc.options = LocalSecretManager::global()
313                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
314                Ok(Some(format_desc))
315            }
316            None => Ok(None),
317        }
318    }
319
320    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
321    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
322        let columns = sink_catalog
323            .visible_columns()
324            .map(|col| col.column_desc.clone())
325            .collect();
326        let properties_with_secret = LocalSecretManager::global()
327            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
328        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
329        Ok(Self {
330            sink_id: sink_catalog.id,
331            sink_name: sink_catalog.name,
332            properties: properties_with_secret,
333            columns,
334            downstream_pk: sink_catalog.downstream_pk,
335            sink_type: sink_catalog.sink_type,
336            format_desc: format_desc_with_secret,
337            db_name: sink_catalog.db_name,
338            sink_from_name: sink_catalog.sink_from_name,
339        })
340    }
341}
342
343pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
344    use crate::enforce_secret::EnforceSecret;
345
346    let connector = props
347        .get_connector()
348        .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
349    let key_iter = props.key_iter();
350    match_sink_name_str!(
351        connector.as_str(),
352        PropType,
353        PropType::enforce_secret(key_iter),
354        |other| bail!("connector '{}' is not supported", other)
355    )
356}
357
358pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
359    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
360
361#[derive(Clone)]
362pub struct SinkMetrics {
363    pub sink_commit_duration: LabelGuardedHistogramVec,
364    pub connector_sink_rows_received: LabelGuardedIntCounterVec,
365
366    // Log store writer metrics
367    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
368    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
369    pub log_store_write_rows: LabelGuardedIntCounterVec,
370
371    // Log store reader metrics
372    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
373    pub log_store_read_rows: LabelGuardedIntCounterVec,
374    pub log_store_read_bytes: LabelGuardedIntCounterVec,
375    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
376
377    // Iceberg metrics
378    pub iceberg_write_qps: LabelGuardedIntCounterVec,
379    pub iceberg_write_latency: LabelGuardedHistogramVec,
380    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
381    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
382    pub iceberg_partition_num: LabelGuardedIntGaugeVec,
383    pub iceberg_write_bytes: LabelGuardedIntCounterVec,
384}
385
386impl SinkMetrics {
387    pub fn new(registry: &Registry) -> Self {
388        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
389            "sink_commit_duration",
390            "Duration of commit op in sink",
391            &["actor_id", "connector", "sink_id", "sink_name"],
392            registry
393        )
394        .unwrap();
395
396        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
397            "connector_sink_rows_received",
398            "Number of rows received by sink",
399            &["actor_id", "connector_type", "sink_id", "sink_name"],
400            registry
401        )
402        .unwrap();
403
404        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
405            "log_store_first_write_epoch",
406            "The first write epoch of log store",
407            &["actor_id", "sink_id", "sink_name"],
408            registry
409        )
410        .unwrap();
411
412        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
413            "log_store_latest_write_epoch",
414            "The latest write epoch of log store",
415            &["actor_id", "sink_id", "sink_name"],
416            registry
417        )
418        .unwrap();
419
420        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
421            "log_store_write_rows",
422            "The write rate of rows",
423            &["actor_id", "sink_id", "sink_name"],
424            registry
425        )
426        .unwrap();
427
428        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
429            "log_store_latest_read_epoch",
430            "The latest read epoch of log store",
431            &["actor_id", "connector", "sink_id", "sink_name"],
432            registry
433        )
434        .unwrap();
435
436        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
437            "log_store_read_rows",
438            "The read rate of rows",
439            &["actor_id", "connector", "sink_id", "sink_name"],
440            registry
441        )
442        .unwrap();
443
444        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
445            "log_store_read_bytes",
446            "Total size of chunks read by log reader",
447            &["actor_id", "connector", "sink_id", "sink_name"],
448            registry
449        )
450        .unwrap();
451
452        let log_store_reader_wait_new_future_duration_ns =
453            register_guarded_int_counter_vec_with_registry!(
454                "log_store_reader_wait_new_future_duration_ns",
455                "Accumulated duration of LogReader to wait for next call to create future",
456                &["actor_id", "connector", "sink_id", "sink_name"],
457                registry
458            )
459            .unwrap();
460
461        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
462            "iceberg_write_qps",
463            "The qps of iceberg writer",
464            &["actor_id", "sink_id", "sink_name"],
465            registry
466        )
467        .unwrap();
468
469        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
470            "iceberg_write_latency",
471            "The latency of iceberg writer",
472            &["actor_id", "sink_id", "sink_name"],
473            registry
474        )
475        .unwrap();
476
477        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
478            "iceberg_rolling_unflushed_data_file",
479            "The unflushed data file count of iceberg rolling writer",
480            &["actor_id", "sink_id", "sink_name"],
481            registry
482        )
483        .unwrap();
484
485        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
486            "iceberg_position_delete_cache_num",
487            "The delete cache num of iceberg position delete writer",
488            &["actor_id", "sink_id", "sink_name"],
489            registry
490        )
491        .unwrap();
492
493        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
494            "iceberg_partition_num",
495            "The partition num of iceberg partition writer",
496            &["actor_id", "sink_id", "sink_name"],
497            registry
498        )
499        .unwrap();
500
501        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
502            "iceberg_write_bytes",
503            "The write bytes of iceberg writer",
504            &["actor_id", "sink_id", "sink_name"],
505            registry
506        )
507        .unwrap();
508
509        Self {
510            sink_commit_duration,
511            connector_sink_rows_received,
512            log_store_first_write_epoch,
513            log_store_latest_write_epoch,
514            log_store_write_rows,
515            log_store_latest_read_epoch,
516            log_store_read_rows,
517            log_store_read_bytes,
518            log_store_reader_wait_new_future_duration_ns,
519            iceberg_write_qps,
520            iceberg_write_latency,
521            iceberg_rolling_unflushed_data_file,
522            iceberg_position_delete_cache_num,
523            iceberg_partition_num,
524            iceberg_write_bytes,
525        }
526    }
527}
528
529#[derive(Clone)]
530pub struct SinkWriterParam {
531    // TODO(eric): deprecate executor_id
532    pub executor_id: u64,
533    pub vnode_bitmap: Option<Bitmap>,
534    pub meta_client: Option<SinkMetaClient>,
535    // The val has two effect:
536    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
537    // 2. The index of the extra partition value column.
538    // More detail of partition value column, see `PartitionComputeInfo`
539    pub extra_partition_col_idx: Option<usize>,
540
541    pub actor_id: ActorId,
542    pub sink_id: SinkId,
543    pub sink_name: String,
544    pub connector: String,
545    pub streaming_config: StreamingConfig,
546}
547
548#[derive(Clone)]
549pub struct SinkWriterMetrics {
550    pub sink_commit_duration: LabelGuardedHistogram,
551    pub connector_sink_rows_received: LabelGuardedIntCounter,
552}
553
554impl SinkWriterMetrics {
555    pub fn new(writer_param: &SinkWriterParam) -> Self {
556        let labels = [
557            &writer_param.actor_id.to_string(),
558            writer_param.connector.as_str(),
559            &writer_param.sink_id.to_string(),
560            writer_param.sink_name.as_str(),
561        ];
562        let sink_commit_duration = GLOBAL_SINK_METRICS
563            .sink_commit_duration
564            .with_guarded_label_values(&labels);
565        let connector_sink_rows_received = GLOBAL_SINK_METRICS
566            .connector_sink_rows_received
567            .with_guarded_label_values(&labels);
568        Self {
569            sink_commit_duration,
570            connector_sink_rows_received,
571        }
572    }
573
574    #[cfg(test)]
575    pub fn for_test() -> Self {
576        Self {
577            sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
578            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
579        }
580    }
581}
582
583#[derive(Clone)]
584pub enum SinkMetaClient {
585    MetaClient(MetaClient),
586    MockMetaClient(MockMetaClient),
587}
588
589impl SinkMetaClient {
590    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
591        match self {
592            SinkMetaClient::MetaClient(meta_client) => {
593                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
594                    meta_client.sink_coordinate_client().await,
595                )
596            }
597            SinkMetaClient::MockMetaClient(mock_meta_client) => {
598                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
599                    mock_meta_client.sink_coordinate_client(),
600                )
601            }
602        }
603    }
604
605    pub async fn add_sink_fail_evet_log(
606        &self,
607        sink_id: u32,
608        sink_name: String,
609        connector: String,
610        error: String,
611    ) {
612        match self {
613            SinkMetaClient::MetaClient(meta_client) => {
614                match meta_client
615                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
616                    .await
617                {
618                    Ok(_) => {}
619                    Err(e) => {
620                        tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
621                    }
622                }
623            }
624            SinkMetaClient::MockMetaClient(_) => {}
625        }
626    }
627}
628
629impl SinkWriterParam {
630    pub fn for_test() -> Self {
631        SinkWriterParam {
632            executor_id: Default::default(),
633            vnode_bitmap: Default::default(),
634            meta_client: Default::default(),
635            extra_partition_col_idx: Default::default(),
636
637            actor_id: 1,
638            sink_id: SinkId::new(1),
639            sink_name: "test_sink".to_owned(),
640            connector: "test_connector".to_owned(),
641            streaming_config: StreamingConfig::default(),
642        }
643    }
644}
645
646fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
647    matches!(
648        sink_name,
649        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK
650    )
651}
652pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
653    const SINK_NAME: &'static str;
654
655    type LogSinker: LogSinker;
656    #[expect(deprecated)]
657    type Coordinator: SinkCommitCoordinator = NoSinkCommitCoordinator;
658
659    fn set_default_commit_checkpoint_interval(
660        desc: &mut SinkDesc,
661        user_specified: &SinkDecouple,
662    ) -> Result<()> {
663        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
664            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
665                Some(commit_checkpoint_interval) => {
666                    let commit_checkpoint_interval = commit_checkpoint_interval
667                        .parse::<u64>()
668                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
669                    if matches!(user_specified, SinkDecouple::Disable)
670                        && commit_checkpoint_interval > 1
671                    {
672                        return Err(SinkError::Config(anyhow!(
673                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
674                        )));
675                    }
676                }
677                None => match user_specified {
678                    SinkDecouple::Default | SinkDecouple::Enable => {
679                        desc.properties.insert(
680                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
681                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
682                        );
683                    }
684                    SinkDecouple::Disable => {
685                        desc.properties.insert(
686                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
687                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
688                        );
689                    }
690                },
691            }
692        }
693        Ok(())
694    }
695
696    /// `user_specified` is the value of `sink_decouple` config.
697    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
698        match user_specified {
699            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
700            SinkDecouple::Disable => Ok(false),
701        }
702    }
703
704    fn support_schema_change() -> bool {
705        false
706    }
707
708    fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
709        Ok(())
710    }
711
712    async fn validate(&self) -> Result<()>;
713    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
714
715    fn is_coordinated_sink(&self) -> bool {
716        false
717    }
718
719    async fn new_coordinator(
720        &self,
721        _db: DatabaseConnection,
722        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
723    ) -> Result<Self::Coordinator> {
724        Err(SinkError::Coordinator(anyhow!("no coordinator")))
725    }
726}
727
728pub trait SinkLogReader: Send {
729    fn start_from(
730        &mut self,
731        start_offset: Option<u64>,
732    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
733    /// Emit the next item.
734    ///
735    /// The implementation should ensure that the future is cancellation safe.
736    fn next_item(
737        &mut self,
738    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
739
740    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
741    /// from the current offset.
742    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
743}
744
745impl<R: LogReader> SinkLogReader for &mut R {
746    fn next_item(
747        &mut self,
748    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
749        <R as LogReader>::next_item(*self)
750    }
751
752    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
753        <R as LogReader>::truncate(*self, offset)
754    }
755
756    fn start_from(
757        &mut self,
758        start_offset: Option<u64>,
759    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
760        <R as LogReader>::start_from(*self, start_offset)
761    }
762}
763
764#[async_trait]
765pub trait LogSinker: 'static + Send {
766    // Note: Please rebuild the log reader's read stream before consuming the log store,
767    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
768}
769pub type SinkCommittedEpochSubscriber = Arc<
770    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
771        + Send
772        + Sync
773        + 'static,
774>;
775
776#[async_trait]
777pub trait SinkCommitCoordinator {
778    /// Initialize the sink committer coordinator, return the log store rewind start offset.
779    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
780    /// After collecting the metadata from each sink writer, a coordinator will call `commit` with
781    /// the set of metadata. The metadata is serialized into bytes, because the metadata is expected
782    /// to be passed between different gRPC node, so in this general trait, the metadata is
783    /// serialized bytes.
784    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
785}
786
787#[deprecated]
788/// A place holder struct of `SinkCommitCoordinator` for sink without coordinator.
789///
790/// It can never be constructed because it holds a never type, and therefore it's safe to
791/// mark all its methods as unreachable.
792///
793/// Explicitly mark this struct as `deprecated` so that when developers accidentally declare it explicitly as
794/// the associated type `Coordinator` when implementing `Sink` trait, they can be warned, and remove the explicit
795/// declaration.
796///
797/// Note:
798///     When we implement a sink without coordinator, don't explicitly write `type Coordinator = NoSinkCommitCoordinator`.
799///     Just remove the explicit declaration and use the default associated type, and besides, don't explicitly implement
800///     `fn new_coordinator(...)` and use the default implementation.
801pub struct NoSinkCommitCoordinator(!);
802
803#[expect(deprecated)]
804#[async_trait]
805impl SinkCommitCoordinator for NoSinkCommitCoordinator {
806    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
807        unreachable!()
808    }
809
810    async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
811        unreachable!()
812    }
813}
814
815impl SinkImpl {
816    pub fn new(mut param: SinkParam) -> Result<Self> {
817        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
818
819        // remove privatelink related properties if any
820        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
821
822        let sink_type = param
823            .properties
824            .get(CONNECTOR_TYPE_KEY)
825            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
826
827        let sink_type = sink_type.to_lowercase();
828        match_sink_name_str!(
829            sink_type.as_str(),
830            SinkType,
831            Ok(SinkType::try_from(param)?.into()),
832            |other| {
833                Err(SinkError::Config(anyhow!(
834                    "unsupported sink connector {}",
835                    other
836                )))
837            }
838        )
839    }
840
841    pub fn is_sink_into_table(&self) -> bool {
842        matches!(self, SinkImpl::Table(_))
843    }
844
845    pub fn is_blackhole(&self) -> bool {
846        matches!(self, SinkImpl::BlackHole(_))
847    }
848
849    pub fn is_coordinated_sink(&self) -> bool {
850        dispatch_sink!(self, sink, sink.is_coordinated_sink())
851    }
852}
853
854pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
855    SinkImpl::new(param)
856}
857
858macro_rules! def_sink_impl {
859    () => {
860        $crate::for_all_sinks! { def_sink_impl }
861    };
862    ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
863        #[derive(Debug)]
864        pub enum SinkImpl {
865            $(
866                $variant_name(Box<$sink_type>),
867            )*
868        }
869
870        $(
871            impl From<$sink_type> for SinkImpl {
872                fn from(sink: $sink_type) -> SinkImpl {
873                    SinkImpl::$variant_name(Box::new(sink))
874                }
875            }
876        )*
877    };
878}
879
880def_sink_impl!();
881
882pub type Result<T> = std::result::Result<T, SinkError>;
883
884#[derive(Error, Debug)]
885pub enum SinkError {
886    #[error("Kafka error: {0}")]
887    Kafka(#[from] rdkafka::error::KafkaError),
888    #[error("Kinesis error: {0}")]
889    Kinesis(
890        #[source]
891        #[backtrace]
892        anyhow::Error,
893    ),
894    #[error("Remote sink error: {0}")]
895    Remote(
896        #[source]
897        #[backtrace]
898        anyhow::Error,
899    ),
900    #[error("Encode error: {0}")]
901    Encode(String),
902    #[error("Avro error: {0}")]
903    Avro(#[from] apache_avro::Error),
904    #[error("Iceberg error: {0}")]
905    Iceberg(
906        #[source]
907        #[backtrace]
908        anyhow::Error,
909    ),
910    #[error("config error: {0}")]
911    Config(
912        #[source]
913        #[backtrace]
914        anyhow::Error,
915    ),
916    #[error("coordinator error: {0}")]
917    Coordinator(
918        #[source]
919        #[backtrace]
920        anyhow::Error,
921    ),
922    #[error("ClickHouse error: {0}")]
923    ClickHouse(String),
924    #[error("Redis error: {0}")]
925    Redis(String),
926    #[error("Mqtt error: {0}")]
927    Mqtt(
928        #[source]
929        #[backtrace]
930        anyhow::Error,
931    ),
932    #[error("Nats error: {0}")]
933    Nats(
934        #[source]
935        #[backtrace]
936        anyhow::Error,
937    ),
938    #[error("Google Pub/Sub error: {0}")]
939    GooglePubSub(
940        #[source]
941        #[backtrace]
942        anyhow::Error,
943    ),
944    #[error("Doris/Starrocks connect error: {0}")]
945    DorisStarrocksConnect(
946        #[source]
947        #[backtrace]
948        anyhow::Error,
949    ),
950    #[error("Doris error: {0}")]
951    Doris(String),
952    #[error("DeltaLake error: {0}")]
953    DeltaLake(
954        #[source]
955        #[backtrace]
956        anyhow::Error,
957    ),
958    #[error("ElasticSearch/OpenSearch error: {0}")]
959    ElasticSearchOpenSearch(
960        #[source]
961        #[backtrace]
962        anyhow::Error,
963    ),
964    #[error("Starrocks error: {0}")]
965    Starrocks(String),
966    #[error("File error: {0}")]
967    File(String),
968    #[error("Pulsar error: {0}")]
969    Pulsar(
970        #[source]
971        #[backtrace]
972        anyhow::Error,
973    ),
974    #[error(transparent)]
975    Internal(
976        #[from]
977        #[backtrace]
978        anyhow::Error,
979    ),
980    #[error("BigQuery error: {0}")]
981    BigQuery(
982        #[source]
983        #[backtrace]
984        anyhow::Error,
985    ),
986    #[error("DynamoDB error: {0}")]
987    DynamoDb(
988        #[source]
989        #[backtrace]
990        anyhow::Error,
991    ),
992    #[error("SQL Server error: {0}")]
993    SqlServer(
994        #[source]
995        #[backtrace]
996        anyhow::Error,
997    ),
998    #[error("Postgres error: {0}")]
999    Postgres(
1000        #[source]
1001        #[backtrace]
1002        anyhow::Error,
1003    ),
1004    #[error(transparent)]
1005    Connector(
1006        #[from]
1007        #[backtrace]
1008        ConnectorError,
1009    ),
1010    #[error("Secret error: {0}")]
1011    Secret(
1012        #[from]
1013        #[backtrace]
1014        SecretError,
1015    ),
1016    #[error("Mongodb error: {0}")]
1017    Mongodb(
1018        #[source]
1019        #[backtrace]
1020        anyhow::Error,
1021    ),
1022}
1023
1024impl From<sea_orm::DbErr> for SinkError {
1025    fn from(err: sea_orm::DbErr) -> Self {
1026        SinkError::Iceberg(anyhow!(err))
1027    }
1028}
1029
1030impl From<OpendalError> for SinkError {
1031    fn from(error: OpendalError) -> Self {
1032        SinkError::File(error.to_report_string())
1033    }
1034}
1035
1036impl From<parquet::errors::ParquetError> for SinkError {
1037    fn from(error: parquet::errors::ParquetError) -> Self {
1038        SinkError::File(error.to_report_string())
1039    }
1040}
1041
1042impl From<ArrayError> for SinkError {
1043    fn from(error: ArrayError) -> Self {
1044        SinkError::File(error.to_report_string())
1045    }
1046}
1047
1048impl From<RpcError> for SinkError {
1049    fn from(value: RpcError) -> Self {
1050        SinkError::Remote(anyhow!(value))
1051    }
1052}
1053
1054impl From<ClickHouseError> for SinkError {
1055    fn from(value: ClickHouseError) -> Self {
1056        SinkError::ClickHouse(value.to_report_string())
1057    }
1058}
1059
1060#[cfg(feature = "sink-deltalake")]
1061impl From<::deltalake::DeltaTableError> for SinkError {
1062    fn from(value: ::deltalake::DeltaTableError) -> Self {
1063        SinkError::DeltaLake(anyhow!(value))
1064    }
1065}
1066
1067impl From<RedisError> for SinkError {
1068    fn from(value: RedisError) -> Self {
1069        SinkError::Redis(value.to_report_string())
1070    }
1071}
1072
1073impl From<tiberius::error::Error> for SinkError {
1074    fn from(err: tiberius::error::Error) -> Self {
1075        SinkError::SqlServer(anyhow!(err))
1076    }
1077}
1078
1079impl From<::elasticsearch::Error> for SinkError {
1080    fn from(err: ::elasticsearch::Error) -> Self {
1081        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1082    }
1083}
1084
1085impl From<::opensearch::Error> for SinkError {
1086    fn from(err: ::opensearch::Error) -> Self {
1087        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1088    }
1089}
1090
1091impl From<tokio_postgres::Error> for SinkError {
1092    fn from(err: tokio_postgres::Error) -> Self {
1093        SinkError::Postgres(anyhow!(err))
1094    }
1095}