risingwave_connector/sink/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32use risingwave_common::bail;
33pub mod log_store;
34pub mod mock_coordination_client;
35pub mod mongodb;
36pub mod mqtt;
37pub mod nats;
38pub mod postgres;
39pub mod pulsar;
40pub mod redis;
41pub mod remote;
42pub mod sqlserver;
43pub mod starrocks;
44pub mod test_sink;
45pub mod trivial;
46pub mod utils;
47pub mod writer;
48pub mod prelude {
49    pub use crate::sink::{
50        Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkError,
51        SinkParam, SinkWriterParam,
52    };
53}
54
55use std::collections::BTreeMap;
56use std::future::Future;
57use std::sync::{Arc, LazyLock};
58
59use ::clickhouse::error::Error as ClickHouseError;
60use ::redis::RedisError;
61use anyhow::anyhow;
62use async_trait::async_trait;
63use clickhouse::CLICKHOUSE_SINK;
64use decouple_checkpoint_log_sink::{
65    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
66    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
67};
68use deltalake::DELTALAKE_SINK;
69use futures::future::BoxFuture;
70use iceberg::ICEBERG_SINK;
71use opendal::Error as OpendalError;
72use prometheus::Registry;
73use risingwave_common::array::ArrayError;
74use risingwave_common::bitmap::Bitmap;
75use risingwave_common::catalog::{ColumnDesc, Field, Schema};
76use risingwave_common::config::StreamingConfig;
77use risingwave_common::hash::ActorId;
78use risingwave_common::metrics::{
79    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
80    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
81};
82use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
83use risingwave_common::secret::{LocalSecretManager, SecretError};
84use risingwave_common::session_config::sink_decouple::SinkDecouple;
85use risingwave_common::{
86    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
87    register_guarded_int_gauge_vec_with_registry,
88};
89use risingwave_pb::catalog::PbSinkType;
90use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
91use risingwave_rpc_client::MetaClient;
92use risingwave_rpc_client::error::RpcError;
93use sea_orm::DatabaseConnection;
94use starrocks::STARROCKS_SINK;
95use thiserror::Error;
96use thiserror_ext::AsReport;
97use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
98pub use tracing;
99
100use self::catalog::{SinkFormatDesc, SinkType};
101use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
102use crate::WithPropertiesExt;
103use crate::connector_common::IcebergSinkCompactionUpdate;
104use crate::error::{ConnectorError, ConnectorResult};
105use crate::sink::catalog::desc::SinkDesc;
106use crate::sink::catalog::{SinkCatalog, SinkId};
107use crate::sink::file_sink::fs::FsSink;
108use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
109use crate::sink::writer::SinkWriter;
110
111const BOUNDED_CHANNEL_SIZE: usize = 16;
112#[macro_export]
113macro_rules! for_all_sinks {
114    ($macro:path $(, $arg:tt)*) => {
115        $macro! {
116            {
117                { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
118                { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
119                { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
120                { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
121                { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
122                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
123                { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
124                { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
125                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
126                { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
127                { Jdbc, $crate::sink::remote::JdbcSink, () },
128                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
129                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
130                { Cassandra, $crate::sink::remote::CassandraSink, () },
131                { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
132                { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
133                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
134
135                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
136                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
137                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
138
139                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
140                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
141                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
142                { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
143                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
144                { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
145                { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
146                { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
147
148                { Test, $crate::sink::test_sink::TestSink, () },
149                { Table, $crate::sink::trivial::TableSink, () }
150            }
151            $(,$arg)*
152        }
153    };
154}
155
156#[macro_export]
157macro_rules! generate_config_use_clauses {
158    ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
159        $(
160            $crate::generate_config_use_single! { $($config_type)+ }
161        )*
162    };
163}
164
165#[macro_export]
166macro_rules! generate_config_use_single {
167    // Skip () config types
168    (()) => {};
169
170    // Generate use clause for actual config types
171    ($config_type:path) => {
172        #[allow(unused_imports)]
173        pub(super) use $config_type;
174    };
175}
176
177// Convenience macro that uses for_all_sinks
178#[macro_export]
179macro_rules! use_all_sink_configs {
180    () => {
181        $crate::for_all_sinks! { $crate::generate_config_use_clauses }
182    };
183}
184
185#[macro_export]
186macro_rules! dispatch_sink {
187    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
188        use $crate::sink::SinkImpl;
189
190        match $impl {
191            $(
192                SinkImpl::$variant_name($sink) => $body,
193            )*
194        }
195    }};
196    ($impl:expr, $sink:ident, $body:expr) => {{
197        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
198    }};
199}
200
201#[macro_export]
202macro_rules! match_sink_name_str {
203    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
204        use $crate::sink::Sink;
205        match $name_str {
206            $(
207                <$sink_type>::SINK_NAME => {
208                    type $type_name = $sink_type;
209                    {
210                        $body
211                    }
212                },
213            )*
214            other => ($on_other_closure)(other),
215        }
216    }};
217    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
218        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
219    }};
220}
221
222pub const CONNECTOR_TYPE_KEY: &str = "connector";
223pub const SINK_TYPE_OPTION: &str = "type";
224/// `snapshot = false` corresponds to [`risingwave_pb::stream_plan::StreamScanType::UpstreamOnly`]
225pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
226pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
227pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
228pub const SINK_TYPE_UPSERT: &str = "upsert";
229pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
230
231#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct SinkParam {
233    pub sink_id: SinkId,
234    pub sink_name: String,
235    pub properties: BTreeMap<String, String>,
236    pub columns: Vec<ColumnDesc>,
237    pub downstream_pk: Vec<usize>,
238    pub sink_type: SinkType,
239    pub format_desc: Option<SinkFormatDesc>,
240    pub db_name: String,
241
242    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
243    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
244    ///
245    /// See also `gen_sink_plan`.
246    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
247    pub sink_from_name: String,
248}
249
250impl SinkParam {
251    pub fn from_proto(pb_param: PbSinkParam) -> Self {
252        let table_schema = pb_param.table_schema.expect("should contain table schema");
253        let format_desc = match pb_param.format_desc {
254            Some(f) => f.try_into().ok(),
255            None => {
256                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
257                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
258                match (connector, r#type) {
259                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
260                    _ => None,
261                }
262            }
263        };
264        Self {
265            sink_id: SinkId::from(pb_param.sink_id),
266            sink_name: pb_param.sink_name,
267            properties: pb_param.properties,
268            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
269            downstream_pk: table_schema
270                .pk_indices
271                .iter()
272                .map(|i| *i as usize)
273                .collect(),
274            sink_type: SinkType::from_proto(
275                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
276            ),
277            format_desc,
278            db_name: pb_param.db_name,
279            sink_from_name: pb_param.sink_from_name,
280        }
281    }
282
283    pub fn to_proto(&self) -> PbSinkParam {
284        PbSinkParam {
285            sink_id: self.sink_id.sink_id,
286            sink_name: self.sink_name.clone(),
287            properties: self.properties.clone(),
288            table_schema: Some(TableSchema {
289                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
290                pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
291            }),
292            sink_type: self.sink_type.to_proto().into(),
293            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
294            db_name: self.db_name.clone(),
295            sink_from_name: self.sink_from_name.clone(),
296        }
297    }
298
299    pub fn schema(&self) -> Schema {
300        Schema {
301            fields: self.columns.iter().map(Field::from).collect(),
302        }
303    }
304
305    // `SinkParams` should only be used when there is a secret context.
306    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
307    pub fn fill_secret_for_format_desc(
308        format_desc: Option<SinkFormatDesc>,
309    ) -> Result<Option<SinkFormatDesc>> {
310        match format_desc {
311            Some(mut format_desc) => {
312                format_desc.options = LocalSecretManager::global()
313                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
314                Ok(Some(format_desc))
315            }
316            None => Ok(None),
317        }
318    }
319
320    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
321    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
322        let columns = sink_catalog
323            .visible_columns()
324            .map(|col| col.column_desc.clone())
325            .collect();
326        let properties_with_secret = LocalSecretManager::global()
327            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
328        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
329        Ok(Self {
330            sink_id: sink_catalog.id,
331            sink_name: sink_catalog.name,
332            properties: properties_with_secret,
333            columns,
334            downstream_pk: sink_catalog.downstream_pk,
335            sink_type: sink_catalog.sink_type,
336            format_desc: format_desc_with_secret,
337            db_name: sink_catalog.db_name,
338            sink_from_name: sink_catalog.sink_from_name,
339        })
340    }
341}
342
343pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
344    use crate::enforce_secret::EnforceSecret;
345
346    let connector = props
347        .get_connector()
348        .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
349    let key_iter = props.key_iter();
350    match_sink_name_str!(
351        connector.as_str(),
352        PropType,
353        PropType::enforce_secret(key_iter),
354        |other| bail!("connector '{}' is not supported", other)
355    )
356}
357
358pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
359    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
360
361#[derive(Clone)]
362pub struct SinkMetrics {
363    pub sink_commit_duration: LabelGuardedHistogramVec,
364    pub connector_sink_rows_received: LabelGuardedIntCounterVec,
365
366    // Log store writer metrics
367    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
368    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
369    pub log_store_write_rows: LabelGuardedIntCounterVec,
370
371    // Log store reader metrics
372    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
373    pub log_store_read_rows: LabelGuardedIntCounterVec,
374    pub log_store_read_bytes: LabelGuardedIntCounterVec,
375    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
376
377    // Iceberg metrics
378    pub iceberg_write_qps: LabelGuardedIntCounterVec,
379    pub iceberg_write_latency: LabelGuardedHistogramVec,
380    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
381    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
382    pub iceberg_partition_num: LabelGuardedIntGaugeVec,
383    pub iceberg_write_bytes: LabelGuardedIntCounterVec,
384}
385
386impl SinkMetrics {
387    pub fn new(registry: &Registry) -> Self {
388        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
389            "sink_commit_duration",
390            "Duration of commit op in sink",
391            &["actor_id", "connector", "sink_id", "sink_name"],
392            registry
393        )
394        .unwrap();
395
396        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
397            "connector_sink_rows_received",
398            "Number of rows received by sink",
399            &["actor_id", "connector_type", "sink_id", "sink_name"],
400            registry
401        )
402        .unwrap();
403
404        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
405            "log_store_first_write_epoch",
406            "The first write epoch of log store",
407            &["actor_id", "sink_id", "sink_name"],
408            registry
409        )
410        .unwrap();
411
412        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
413            "log_store_latest_write_epoch",
414            "The latest write epoch of log store",
415            &["actor_id", "sink_id", "sink_name"],
416            registry
417        )
418        .unwrap();
419
420        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
421            "log_store_write_rows",
422            "The write rate of rows",
423            &["actor_id", "sink_id", "sink_name"],
424            registry
425        )
426        .unwrap();
427
428        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
429            "log_store_latest_read_epoch",
430            "The latest read epoch of log store",
431            &["actor_id", "connector", "sink_id", "sink_name"],
432            registry
433        )
434        .unwrap();
435
436        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
437            "log_store_read_rows",
438            "The read rate of rows",
439            &["actor_id", "connector", "sink_id", "sink_name"],
440            registry
441        )
442        .unwrap();
443
444        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
445            "log_store_read_bytes",
446            "Total size of chunks read by log reader",
447            &["actor_id", "connector", "sink_id", "sink_name"],
448            registry
449        )
450        .unwrap();
451
452        let log_store_reader_wait_new_future_duration_ns =
453            register_guarded_int_counter_vec_with_registry!(
454                "log_store_reader_wait_new_future_duration_ns",
455                "Accumulated duration of LogReader to wait for next call to create future",
456                &["actor_id", "connector", "sink_id", "sink_name"],
457                registry
458            )
459            .unwrap();
460
461        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
462            "iceberg_write_qps",
463            "The qps of iceberg writer",
464            &["actor_id", "sink_id", "sink_name"],
465            registry
466        )
467        .unwrap();
468
469        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
470            "iceberg_write_latency",
471            "The latency of iceberg writer",
472            &["actor_id", "sink_id", "sink_name"],
473            registry
474        )
475        .unwrap();
476
477        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
478            "iceberg_rolling_unflushed_data_file",
479            "The unflushed data file count of iceberg rolling writer",
480            &["actor_id", "sink_id", "sink_name"],
481            registry
482        )
483        .unwrap();
484
485        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
486            "iceberg_position_delete_cache_num",
487            "The delete cache num of iceberg position delete writer",
488            &["actor_id", "sink_id", "sink_name"],
489            registry
490        )
491        .unwrap();
492
493        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
494            "iceberg_partition_num",
495            "The partition num of iceberg partition writer",
496            &["actor_id", "sink_id", "sink_name"],
497            registry
498        )
499        .unwrap();
500
501        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
502            "iceberg_write_bytes",
503            "The write bytes of iceberg writer",
504            &["actor_id", "sink_id", "sink_name"],
505            registry
506        )
507        .unwrap();
508
509        Self {
510            sink_commit_duration,
511            connector_sink_rows_received,
512            log_store_first_write_epoch,
513            log_store_latest_write_epoch,
514            log_store_write_rows,
515            log_store_latest_read_epoch,
516            log_store_read_rows,
517            log_store_read_bytes,
518            log_store_reader_wait_new_future_duration_ns,
519            iceberg_write_qps,
520            iceberg_write_latency,
521            iceberg_rolling_unflushed_data_file,
522            iceberg_position_delete_cache_num,
523            iceberg_partition_num,
524            iceberg_write_bytes,
525        }
526    }
527}
528
529#[derive(Clone)]
530pub struct SinkWriterParam {
531    // TODO(eric): deprecate executor_id
532    pub executor_id: u64,
533    pub vnode_bitmap: Option<Bitmap>,
534    pub meta_client: Option<SinkMetaClient>,
535    // The val has two effect:
536    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
537    // 2. The index of the extra partition value column.
538    // More detail of partition value column, see `PartitionComputeInfo`
539    pub extra_partition_col_idx: Option<usize>,
540
541    pub actor_id: ActorId,
542    pub sink_id: SinkId,
543    pub sink_name: String,
544    pub connector: String,
545    pub streaming_config: StreamingConfig,
546}
547
548#[derive(Clone)]
549pub struct SinkWriterMetrics {
550    pub sink_commit_duration: LabelGuardedHistogram,
551    pub connector_sink_rows_received: LabelGuardedIntCounter,
552}
553
554impl SinkWriterMetrics {
555    pub fn new(writer_param: &SinkWriterParam) -> Self {
556        let labels = [
557            &writer_param.actor_id.to_string(),
558            writer_param.connector.as_str(),
559            &writer_param.sink_id.to_string(),
560            writer_param.sink_name.as_str(),
561        ];
562        let sink_commit_duration = GLOBAL_SINK_METRICS
563            .sink_commit_duration
564            .with_guarded_label_values(&labels);
565        let connector_sink_rows_received = GLOBAL_SINK_METRICS
566            .connector_sink_rows_received
567            .with_guarded_label_values(&labels);
568        Self {
569            sink_commit_duration,
570            connector_sink_rows_received,
571        }
572    }
573
574    #[cfg(test)]
575    pub fn for_test() -> Self {
576        Self {
577            sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
578            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
579        }
580    }
581}
582
583#[derive(Clone)]
584pub enum SinkMetaClient {
585    MetaClient(MetaClient),
586    MockMetaClient(MockMetaClient),
587}
588
589impl SinkMetaClient {
590    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
591        match self {
592            SinkMetaClient::MetaClient(meta_client) => {
593                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
594                    meta_client.sink_coordinate_client().await,
595                )
596            }
597            SinkMetaClient::MockMetaClient(mock_meta_client) => {
598                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
599                    mock_meta_client.sink_coordinate_client(),
600                )
601            }
602        }
603    }
604
605    pub async fn add_sink_fail_evet_log(
606        &self,
607        sink_id: u32,
608        sink_name: String,
609        connector: String,
610        error: String,
611    ) {
612        match self {
613            SinkMetaClient::MetaClient(meta_client) => {
614                match meta_client
615                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
616                    .await
617                {
618                    Ok(_) => {}
619                    Err(e) => {
620                        tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
621                    }
622                }
623            }
624            SinkMetaClient::MockMetaClient(_) => {}
625        }
626    }
627}
628
629impl SinkWriterParam {
630    pub fn for_test() -> Self {
631        SinkWriterParam {
632            executor_id: Default::default(),
633            vnode_bitmap: Default::default(),
634            meta_client: Default::default(),
635            extra_partition_col_idx: Default::default(),
636
637            actor_id: 1,
638            sink_id: SinkId::new(1),
639            sink_name: "test_sink".to_owned(),
640            connector: "test_connector".to_owned(),
641            streaming_config: StreamingConfig::default(),
642        }
643    }
644}
645
646fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
647    matches!(
648        sink_name,
649        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK
650    )
651}
652pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
653    const SINK_NAME: &'static str;
654
655    type LogSinker: LogSinker;
656    type Coordinator: SinkCommitCoordinator;
657
658    fn set_default_commit_checkpoint_interval(
659        desc: &mut SinkDesc,
660        user_specified: &SinkDecouple,
661    ) -> Result<()> {
662        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
663            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
664                Some(commit_checkpoint_interval) => {
665                    let commit_checkpoint_interval = commit_checkpoint_interval
666                        .parse::<u64>()
667                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
668                    if matches!(user_specified, SinkDecouple::Disable)
669                        && commit_checkpoint_interval > 1
670                    {
671                        return Err(SinkError::Config(anyhow!(
672                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
673                        )));
674                    }
675                }
676                None => match user_specified {
677                    SinkDecouple::Default | SinkDecouple::Enable => {
678                        desc.properties.insert(
679                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
680                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
681                        );
682                    }
683                    SinkDecouple::Disable => {
684                        desc.properties.insert(
685                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
686                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
687                        );
688                    }
689                },
690            }
691        }
692        Ok(())
693    }
694
695    /// `user_specified` is the value of `sink_decouple` config.
696    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
697        match user_specified {
698            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
699            SinkDecouple::Disable => Ok(false),
700        }
701    }
702
703    fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
704        Ok(())
705    }
706
707    async fn validate(&self) -> Result<()>;
708    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
709
710    fn is_coordinated_sink(&self) -> bool {
711        false
712    }
713
714    async fn new_coordinator(
715        &self,
716        _db: DatabaseConnection,
717        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
718    ) -> Result<Self::Coordinator> {
719        Err(SinkError::Coordinator(anyhow!("no coordinator")))
720    }
721}
722
723pub trait SinkLogReader: Send {
724    fn start_from(
725        &mut self,
726        start_offset: Option<u64>,
727    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
728    /// Emit the next item.
729    ///
730    /// The implementation should ensure that the future is cancellation safe.
731    fn next_item(
732        &mut self,
733    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
734
735    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
736    /// from the current offset.
737    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
738}
739
740impl<R: LogReader> SinkLogReader for &mut R {
741    fn next_item(
742        &mut self,
743    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
744        <R as LogReader>::next_item(*self)
745    }
746
747    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
748        <R as LogReader>::truncate(*self, offset)
749    }
750
751    fn start_from(
752        &mut self,
753        start_offset: Option<u64>,
754    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
755        <R as LogReader>::start_from(*self, start_offset)
756    }
757}
758
759#[async_trait]
760pub trait LogSinker: 'static + Send {
761    // Note: Please rebuild the log reader's read stream before consuming the log store,
762    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
763}
764pub type SinkCommittedEpochSubscriber = Arc<
765    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
766        + Send
767        + Sync
768        + 'static,
769>;
770
771#[async_trait]
772pub trait SinkCommitCoordinator {
773    /// Initialize the sink committer coordinator, return the log store rewind start offset.
774    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
775    /// After collecting the metadata from each sink writer, a coordinator will call `commit` with
776    /// the set of metadata. The metadata is serialized into bytes, because the metadata is expected
777    /// to be passed between different gRPC node, so in this general trait, the metadata is
778    /// serialized bytes.
779    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
780}
781
782pub struct DummySinkCommitCoordinator;
783
784#[async_trait]
785impl SinkCommitCoordinator for DummySinkCommitCoordinator {
786    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
787        Ok(None)
788    }
789
790    async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
791        Ok(())
792    }
793}
794
795impl SinkImpl {
796    pub fn new(mut param: SinkParam) -> Result<Self> {
797        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
798
799        // remove privatelink related properties if any
800        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
801
802        let sink_type = param
803            .properties
804            .get(CONNECTOR_TYPE_KEY)
805            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
806
807        let sink_type = sink_type.to_lowercase();
808        match_sink_name_str!(
809            sink_type.as_str(),
810            SinkType,
811            Ok(SinkType::try_from(param)?.into()),
812            |other| {
813                Err(SinkError::Config(anyhow!(
814                    "unsupported sink connector {}",
815                    other
816                )))
817            }
818        )
819    }
820
821    pub fn is_sink_into_table(&self) -> bool {
822        matches!(self, SinkImpl::Table(_))
823    }
824
825    pub fn is_blackhole(&self) -> bool {
826        matches!(self, SinkImpl::BlackHole(_))
827    }
828
829    pub fn is_coordinated_sink(&self) -> bool {
830        dispatch_sink!(self, sink, sink.is_coordinated_sink())
831    }
832}
833
834pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
835    SinkImpl::new(param)
836}
837
838macro_rules! def_sink_impl {
839    () => {
840        $crate::for_all_sinks! { def_sink_impl }
841    };
842    ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
843        #[derive(Debug)]
844        pub enum SinkImpl {
845            $(
846                $variant_name(Box<$sink_type>),
847            )*
848        }
849
850        $(
851            impl From<$sink_type> for SinkImpl {
852                fn from(sink: $sink_type) -> SinkImpl {
853                    SinkImpl::$variant_name(Box::new(sink))
854                }
855            }
856        )*
857    };
858}
859
860def_sink_impl!();
861
862pub type Result<T> = std::result::Result<T, SinkError>;
863
864#[derive(Error, Debug)]
865pub enum SinkError {
866    #[error("Kafka error: {0}")]
867    Kafka(#[from] rdkafka::error::KafkaError),
868    #[error("Kinesis error: {0}")]
869    Kinesis(
870        #[source]
871        #[backtrace]
872        anyhow::Error,
873    ),
874    #[error("Remote sink error: {0}")]
875    Remote(
876        #[source]
877        #[backtrace]
878        anyhow::Error,
879    ),
880    #[error("Encode error: {0}")]
881    Encode(String),
882    #[error("Avro error: {0}")]
883    Avro(#[from] apache_avro::Error),
884    #[error("Iceberg error: {0}")]
885    Iceberg(
886        #[source]
887        #[backtrace]
888        anyhow::Error,
889    ),
890    #[error("config error: {0}")]
891    Config(
892        #[source]
893        #[backtrace]
894        anyhow::Error,
895    ),
896    #[error("coordinator error: {0}")]
897    Coordinator(
898        #[source]
899        #[backtrace]
900        anyhow::Error,
901    ),
902    #[error("ClickHouse error: {0}")]
903    ClickHouse(String),
904    #[error("Redis error: {0}")]
905    Redis(String),
906    #[error("Mqtt error: {0}")]
907    Mqtt(
908        #[source]
909        #[backtrace]
910        anyhow::Error,
911    ),
912    #[error("Nats error: {0}")]
913    Nats(
914        #[source]
915        #[backtrace]
916        anyhow::Error,
917    ),
918    #[error("Google Pub/Sub error: {0}")]
919    GooglePubSub(
920        #[source]
921        #[backtrace]
922        anyhow::Error,
923    ),
924    #[error("Doris/Starrocks connect error: {0}")]
925    DorisStarrocksConnect(
926        #[source]
927        #[backtrace]
928        anyhow::Error,
929    ),
930    #[error("Doris error: {0}")]
931    Doris(String),
932    #[error("DeltaLake error: {0}")]
933    DeltaLake(
934        #[source]
935        #[backtrace]
936        anyhow::Error,
937    ),
938    #[error("ElasticSearch/OpenSearch error: {0}")]
939    ElasticSearchOpenSearch(
940        #[source]
941        #[backtrace]
942        anyhow::Error,
943    ),
944    #[error("Starrocks error: {0}")]
945    Starrocks(String),
946    #[error("File error: {0}")]
947    File(String),
948    #[error("Pulsar error: {0}")]
949    Pulsar(
950        #[source]
951        #[backtrace]
952        anyhow::Error,
953    ),
954    #[error(transparent)]
955    Internal(
956        #[from]
957        #[backtrace]
958        anyhow::Error,
959    ),
960    #[error("BigQuery error: {0}")]
961    BigQuery(
962        #[source]
963        #[backtrace]
964        anyhow::Error,
965    ),
966    #[error("DynamoDB error: {0}")]
967    DynamoDb(
968        #[source]
969        #[backtrace]
970        anyhow::Error,
971    ),
972    #[error("SQL Server error: {0}")]
973    SqlServer(
974        #[source]
975        #[backtrace]
976        anyhow::Error,
977    ),
978    #[error("Postgres error: {0}")]
979    Postgres(
980        #[source]
981        #[backtrace]
982        anyhow::Error,
983    ),
984    #[error(transparent)]
985    Connector(
986        #[from]
987        #[backtrace]
988        ConnectorError,
989    ),
990    #[error("Secret error: {0}")]
991    Secret(
992        #[from]
993        #[backtrace]
994        SecretError,
995    ),
996    #[error("Mongodb error: {0}")]
997    Mongodb(
998        #[source]
999        #[backtrace]
1000        anyhow::Error,
1001    ),
1002}
1003
1004impl From<sea_orm::DbErr> for SinkError {
1005    fn from(err: sea_orm::DbErr) -> Self {
1006        SinkError::Iceberg(anyhow!(err))
1007    }
1008}
1009
1010impl From<OpendalError> for SinkError {
1011    fn from(error: OpendalError) -> Self {
1012        SinkError::File(error.to_report_string())
1013    }
1014}
1015
1016impl From<parquet::errors::ParquetError> for SinkError {
1017    fn from(error: parquet::errors::ParquetError) -> Self {
1018        SinkError::File(error.to_report_string())
1019    }
1020}
1021
1022impl From<ArrayError> for SinkError {
1023    fn from(error: ArrayError) -> Self {
1024        SinkError::File(error.to_report_string())
1025    }
1026}
1027
1028impl From<RpcError> for SinkError {
1029    fn from(value: RpcError) -> Self {
1030        SinkError::Remote(anyhow!(value))
1031    }
1032}
1033
1034impl From<ClickHouseError> for SinkError {
1035    fn from(value: ClickHouseError) -> Self {
1036        SinkError::ClickHouse(value.to_report_string())
1037    }
1038}
1039
1040#[cfg(feature = "sink-deltalake")]
1041impl From<::deltalake::DeltaTableError> for SinkError {
1042    fn from(value: ::deltalake::DeltaTableError) -> Self {
1043        SinkError::DeltaLake(anyhow!(value))
1044    }
1045}
1046
1047impl From<RedisError> for SinkError {
1048    fn from(value: RedisError) -> Self {
1049        SinkError::Redis(value.to_report_string())
1050    }
1051}
1052
1053impl From<tiberius::error::Error> for SinkError {
1054    fn from(err: tiberius::error::Error) -> Self {
1055        SinkError::SqlServer(anyhow!(err))
1056    }
1057}
1058
1059impl From<::elasticsearch::Error> for SinkError {
1060    fn from(err: ::elasticsearch::Error) -> Self {
1061        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1062    }
1063}
1064
1065impl From<::opensearch::Error> for SinkError {
1066    fn from(err: ::opensearch::Error) -> Self {
1067        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1068    }
1069}
1070
1071impl From<tokio_postgres::Error> for SinkError {
1072    fn from(err: tokio_postgres::Error) -> Self {
1073        SinkError::Postgres(anyhow!(err))
1074    }
1075}