risingwave_connector/sink/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32use risingwave_common::bail;
33pub mod log_store;
34pub mod mock_coordination_client;
35pub mod mongodb;
36pub mod mqtt;
37pub mod nats;
38pub mod postgres;
39pub mod pulsar;
40pub mod redis;
41pub mod remote;
42pub mod sqlserver;
43pub mod starrocks;
44pub mod test_sink;
45pub mod trivial;
46pub mod utils;
47pub mod writer;
48
49use std::collections::BTreeMap;
50use std::future::Future;
51use std::sync::{Arc, LazyLock};
52
53use ::clickhouse::error::Error as ClickHouseError;
54use ::deltalake::DeltaTableError;
55use ::redis::RedisError;
56use anyhow::anyhow;
57use async_trait::async_trait;
58use clickhouse::CLICKHOUSE_SINK;
59use decouple_checkpoint_log_sink::{
60    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
61    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
62};
63use deltalake::DELTALAKE_SINK;
64use futures::future::BoxFuture;
65use iceberg::ICEBERG_SINK;
66use opendal::Error as OpendalError;
67use prometheus::Registry;
68use risingwave_common::array::ArrayError;
69use risingwave_common::bitmap::Bitmap;
70use risingwave_common::catalog::{ColumnDesc, Field, Schema};
71use risingwave_common::config::StreamingConfig;
72use risingwave_common::hash::ActorId;
73use risingwave_common::metrics::{
74    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
75    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
76};
77use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
78use risingwave_common::secret::{LocalSecretManager, SecretError};
79use risingwave_common::session_config::sink_decouple::SinkDecouple;
80use risingwave_common::{
81    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
82    register_guarded_int_gauge_vec_with_registry,
83};
84use risingwave_pb::catalog::PbSinkType;
85use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
86use risingwave_rpc_client::MetaClient;
87use risingwave_rpc_client::error::RpcError;
88use sea_orm::DatabaseConnection;
89use starrocks::STARROCKS_SINK;
90use thiserror::Error;
91use thiserror_ext::AsReport;
92use tokio::sync::mpsc::UnboundedReceiver;
93pub use tracing;
94
95use self::catalog::{SinkFormatDesc, SinkType};
96use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
97use crate::WithPropertiesExt;
98use crate::error::{ConnectorError, ConnectorResult};
99use crate::sink::catalog::desc::SinkDesc;
100use crate::sink::catalog::{SinkCatalog, SinkId};
101use crate::sink::file_sink::fs::FsSink;
102use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
103use crate::sink::writer::SinkWriter;
104
105const BOUNDED_CHANNEL_SIZE: usize = 16;
106#[macro_export]
107macro_rules! for_all_sinks {
108    ($macro:path $(, $arg:tt)*) => {
109        $macro! {
110            {
111                { Redis, $crate::sink::redis::RedisSink },
112                { Kafka, $crate::sink::kafka::KafkaSink },
113                { Pulsar, $crate::sink::pulsar::PulsarSink },
114                { BlackHole, $crate::sink::trivial::BlackHoleSink },
115                { Kinesis, $crate::sink::kinesis::KinesisSink },
116                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink },
117                { Iceberg, $crate::sink::iceberg::IcebergSink },
118                { Mqtt, $crate::sink::mqtt::MqttSink },
119                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink },
120                { Nats, $crate::sink::nats::NatsSink },
121                { Jdbc, $crate::sink::remote::JdbcSink },
122                // { ElasticSearchJava, $crate::sink::remote::ElasticSearchJavaSink },
123                // { OpensearchJava, $crate::sink::remote::OpenSearchJavaSink },
124                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink },
125                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink },
126                { Cassandra, $crate::sink::remote::CassandraSink },
127                { Doris, $crate::sink::doris::DorisSink },
128                { Starrocks, $crate::sink::starrocks::StarrocksSink },
129                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>},
130
131                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>  },
132                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>},
133                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>},
134
135                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>  },
136                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>},
137                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
138                { BigQuery, $crate::sink::big_query::BigQuerySink },
139                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink },
140                { Mongodb, $crate::sink::mongodb::MongodbSink },
141                { SqlServer, $crate::sink::sqlserver::SqlServerSink },
142                { Postgres, $crate::sink::postgres::PostgresSink },
143
144                { Test, $crate::sink::test_sink::TestSink },
145                { Table, $crate::sink::trivial::TableSink }
146            }
147            $(,$arg)*
148        }
149    };
150}
151
152#[macro_export]
153macro_rules! dispatch_sink {
154    ({$({$variant_name:ident, $sink_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
155        use $crate::sink::SinkImpl;
156
157        match $impl {
158            $(
159                SinkImpl::$variant_name($sink) => $body,
160            )*
161        }
162    }};
163    ($impl:expr, $sink:ident, $body:expr) => {{
164        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
165    }};
166}
167
168#[macro_export]
169macro_rules! match_sink_name_str {
170    ({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
171        use $crate::sink::Sink;
172        match $name_str {
173            $(
174                <$sink_type>::SINK_NAME => {
175                    type $type_name = $sink_type;
176                    {
177                        $body
178                    }
179                },
180            )*
181            other => ($on_other_closure)(other),
182        }
183    }};
184    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
185        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
186    }};
187}
188
189pub const CONNECTOR_TYPE_KEY: &str = "connector";
190pub const SINK_TYPE_OPTION: &str = "type";
191pub const SINK_WITHOUT_BACKFILL: &str = "snapshot";
192pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
193pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
194pub const SINK_TYPE_UPSERT: &str = "upsert";
195pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
196
197#[derive(Debug, Clone, PartialEq, Eq)]
198pub struct SinkParam {
199    pub sink_id: SinkId,
200    pub sink_name: String,
201    pub properties: BTreeMap<String, String>,
202    pub columns: Vec<ColumnDesc>,
203    pub downstream_pk: Vec<usize>,
204    pub sink_type: SinkType,
205    pub format_desc: Option<SinkFormatDesc>,
206    pub db_name: String,
207
208    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
209    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
210    ///
211    /// See also `gen_sink_plan`.
212    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
213    pub sink_from_name: String,
214}
215
216impl SinkParam {
217    pub fn from_proto(pb_param: PbSinkParam) -> Self {
218        let table_schema = pb_param.table_schema.expect("should contain table schema");
219        let format_desc = match pb_param.format_desc {
220            Some(f) => f.try_into().ok(),
221            None => {
222                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
223                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
224                match (connector, r#type) {
225                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
226                    _ => None,
227                }
228            }
229        };
230        Self {
231            sink_id: SinkId::from(pb_param.sink_id),
232            sink_name: pb_param.sink_name,
233            properties: pb_param.properties,
234            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
235            downstream_pk: table_schema
236                .pk_indices
237                .iter()
238                .map(|i| *i as usize)
239                .collect(),
240            sink_type: SinkType::from_proto(
241                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
242            ),
243            format_desc,
244            db_name: pb_param.db_name,
245            sink_from_name: pb_param.sink_from_name,
246        }
247    }
248
249    pub fn to_proto(&self) -> PbSinkParam {
250        PbSinkParam {
251            sink_id: self.sink_id.sink_id,
252            sink_name: self.sink_name.clone(),
253            properties: self.properties.clone(),
254            table_schema: Some(TableSchema {
255                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
256                pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
257            }),
258            sink_type: self.sink_type.to_proto().into(),
259            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
260            db_name: self.db_name.clone(),
261            sink_from_name: self.sink_from_name.clone(),
262        }
263    }
264
265    pub fn schema(&self) -> Schema {
266        Schema {
267            fields: self.columns.iter().map(Field::from).collect(),
268        }
269    }
270
271    // `SinkParams` should only be used when there is a secret context.
272    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
273    pub fn fill_secret_for_format_desc(
274        format_desc: Option<SinkFormatDesc>,
275    ) -> Result<Option<SinkFormatDesc>> {
276        match format_desc {
277            Some(mut format_desc) => {
278                format_desc.options = LocalSecretManager::global()
279                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
280                Ok(Some(format_desc))
281            }
282            None => Ok(None),
283        }
284    }
285
286    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
287    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
288        let columns = sink_catalog
289            .visible_columns()
290            .map(|col| col.column_desc.clone())
291            .collect();
292        let properties_with_secret = LocalSecretManager::global()
293            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
294        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
295        Ok(Self {
296            sink_id: sink_catalog.id,
297            sink_name: sink_catalog.name,
298            properties: properties_with_secret,
299            columns,
300            downstream_pk: sink_catalog.downstream_pk,
301            sink_type: sink_catalog.sink_type,
302            format_desc: format_desc_with_secret,
303            db_name: sink_catalog.db_name,
304            sink_from_name: sink_catalog.sink_from_name,
305        })
306    }
307}
308
309pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
310    use crate::enforce_secret::EnforceSecret;
311
312    let connector = props
313        .get_connector()
314        .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
315    let key_iter = props.key_iter();
316    match_sink_name_str!(
317        connector.as_str(),
318        PropType,
319        PropType::enforce_secret(key_iter),
320        |other| bail!("connector '{}' is not supported", other)
321    )
322}
323
324pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
325    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
326
327#[derive(Clone)]
328pub struct SinkMetrics {
329    pub sink_commit_duration: LabelGuardedHistogramVec,
330    pub connector_sink_rows_received: LabelGuardedIntCounterVec,
331
332    // Log store writer metrics
333    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
334    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
335    pub log_store_write_rows: LabelGuardedIntCounterVec,
336
337    // Log store reader metrics
338    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
339    pub log_store_read_rows: LabelGuardedIntCounterVec,
340    pub log_store_read_bytes: LabelGuardedIntCounterVec,
341    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
342
343    // Iceberg metrics
344    pub iceberg_write_qps: LabelGuardedIntCounterVec,
345    pub iceberg_write_latency: LabelGuardedHistogramVec,
346    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
347    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
348    pub iceberg_partition_num: LabelGuardedIntGaugeVec,
349    pub iceberg_write_bytes: LabelGuardedIntCounterVec,
350}
351
352impl SinkMetrics {
353    pub fn new(registry: &Registry) -> Self {
354        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
355            "sink_commit_duration",
356            "Duration of commit op in sink",
357            &["actor_id", "connector", "sink_id", "sink_name"],
358            registry
359        )
360        .unwrap();
361
362        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
363            "connector_sink_rows_received",
364            "Number of rows received by sink",
365            &["actor_id", "connector_type", "sink_id", "sink_name"],
366            registry
367        )
368        .unwrap();
369
370        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
371            "log_store_first_write_epoch",
372            "The first write epoch of log store",
373            &["actor_id", "sink_id", "sink_name"],
374            registry
375        )
376        .unwrap();
377
378        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
379            "log_store_latest_write_epoch",
380            "The latest write epoch of log store",
381            &["actor_id", "sink_id", "sink_name"],
382            registry
383        )
384        .unwrap();
385
386        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
387            "log_store_write_rows",
388            "The write rate of rows",
389            &["actor_id", "sink_id", "sink_name"],
390            registry
391        )
392        .unwrap();
393
394        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
395            "log_store_latest_read_epoch",
396            "The latest read epoch of log store",
397            &["actor_id", "connector", "sink_id", "sink_name"],
398            registry
399        )
400        .unwrap();
401
402        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
403            "log_store_read_rows",
404            "The read rate of rows",
405            &["actor_id", "connector", "sink_id", "sink_name"],
406            registry
407        )
408        .unwrap();
409
410        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
411            "log_store_read_bytes",
412            "Total size of chunks read by log reader",
413            &["actor_id", "connector", "sink_id", "sink_name"],
414            registry
415        )
416        .unwrap();
417
418        let log_store_reader_wait_new_future_duration_ns =
419            register_guarded_int_counter_vec_with_registry!(
420                "log_store_reader_wait_new_future_duration_ns",
421                "Accumulated duration of LogReader to wait for next call to create future",
422                &["actor_id", "connector", "sink_id", "sink_name"],
423                registry
424            )
425            .unwrap();
426
427        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
428            "iceberg_write_qps",
429            "The qps of iceberg writer",
430            &["actor_id", "sink_id", "sink_name"],
431            registry
432        )
433        .unwrap();
434
435        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
436            "iceberg_write_latency",
437            "The latency of iceberg writer",
438            &["actor_id", "sink_id", "sink_name"],
439            registry
440        )
441        .unwrap();
442
443        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
444            "iceberg_rolling_unflushed_data_file",
445            "The unflushed data file count of iceberg rolling writer",
446            &["actor_id", "sink_id", "sink_name"],
447            registry
448        )
449        .unwrap();
450
451        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
452            "iceberg_position_delete_cache_num",
453            "The delete cache num of iceberg position delete writer",
454            &["actor_id", "sink_id", "sink_name"],
455            registry
456        )
457        .unwrap();
458
459        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
460            "iceberg_partition_num",
461            "The partition num of iceberg partition writer",
462            &["actor_id", "sink_id", "sink_name"],
463            registry
464        )
465        .unwrap();
466
467        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
468            "iceberg_write_bytes",
469            "The write bytes of iceberg writer",
470            &["actor_id", "sink_id", "sink_name"],
471            registry
472        )
473        .unwrap();
474
475        Self {
476            sink_commit_duration,
477            connector_sink_rows_received,
478            log_store_first_write_epoch,
479            log_store_latest_write_epoch,
480            log_store_write_rows,
481            log_store_latest_read_epoch,
482            log_store_read_rows,
483            log_store_read_bytes,
484            log_store_reader_wait_new_future_duration_ns,
485            iceberg_write_qps,
486            iceberg_write_latency,
487            iceberg_rolling_unflushed_data_file,
488            iceberg_position_delete_cache_num,
489            iceberg_partition_num,
490            iceberg_write_bytes,
491        }
492    }
493}
494
495#[derive(Clone)]
496pub struct SinkWriterParam {
497    // TODO(eric): deprecate executor_id
498    pub executor_id: u64,
499    pub vnode_bitmap: Option<Bitmap>,
500    pub meta_client: Option<SinkMetaClient>,
501    // The val has two effect:
502    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
503    // 2. The index of the extra partition value column.
504    // More detail of partition value column, see `PartitionComputeInfo`
505    pub extra_partition_col_idx: Option<usize>,
506
507    pub actor_id: ActorId,
508    pub sink_id: SinkId,
509    pub sink_name: String,
510    pub connector: String,
511    pub streaming_config: StreamingConfig,
512}
513
514#[derive(Clone)]
515pub struct SinkWriterMetrics {
516    pub sink_commit_duration: LabelGuardedHistogram,
517    pub connector_sink_rows_received: LabelGuardedIntCounter,
518}
519
520impl SinkWriterMetrics {
521    pub fn new(writer_param: &SinkWriterParam) -> Self {
522        let labels = [
523            &writer_param.actor_id.to_string(),
524            writer_param.connector.as_str(),
525            &writer_param.sink_id.to_string(),
526            writer_param.sink_name.as_str(),
527        ];
528        let sink_commit_duration = GLOBAL_SINK_METRICS
529            .sink_commit_duration
530            .with_guarded_label_values(&labels);
531        let connector_sink_rows_received = GLOBAL_SINK_METRICS
532            .connector_sink_rows_received
533            .with_guarded_label_values(&labels);
534        Self {
535            sink_commit_duration,
536            connector_sink_rows_received,
537        }
538    }
539
540    #[cfg(test)]
541    pub fn for_test() -> Self {
542        Self {
543            sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
544            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
545        }
546    }
547}
548
549#[derive(Clone)]
550pub enum SinkMetaClient {
551    MetaClient(MetaClient),
552    MockMetaClient(MockMetaClient),
553}
554
555impl SinkMetaClient {
556    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
557        match self {
558            SinkMetaClient::MetaClient(meta_client) => {
559                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
560                    meta_client.sink_coordinate_client().await,
561                )
562            }
563            SinkMetaClient::MockMetaClient(mock_meta_client) => {
564                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
565                    mock_meta_client.sink_coordinate_client(),
566                )
567            }
568        }
569    }
570
571    pub async fn add_sink_fail_evet_log(
572        &self,
573        sink_id: u32,
574        sink_name: String,
575        connector: String,
576        error: String,
577    ) {
578        match self {
579            SinkMetaClient::MetaClient(meta_client) => {
580                match meta_client
581                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
582                    .await
583                {
584                    Ok(_) => {}
585                    Err(e) => {
586                        tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
587                    }
588                }
589            }
590            SinkMetaClient::MockMetaClient(_) => {}
591        }
592    }
593}
594
595impl SinkWriterParam {
596    pub fn for_test() -> Self {
597        SinkWriterParam {
598            executor_id: Default::default(),
599            vnode_bitmap: Default::default(),
600            meta_client: Default::default(),
601            extra_partition_col_idx: Default::default(),
602
603            actor_id: 1,
604            sink_id: SinkId::new(1),
605            sink_name: "test_sink".to_owned(),
606            connector: "test_connector".to_owned(),
607            streaming_config: StreamingConfig::default(),
608        }
609    }
610}
611
612fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
613    matches!(
614        sink_name,
615        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK
616    )
617}
618pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
619    const SINK_NAME: &'static str;
620    const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &[];
621    type LogSinker: LogSinker;
622    type Coordinator: SinkCommitCoordinator;
623
624    fn set_default_commit_checkpoint_interval(
625        desc: &mut SinkDesc,
626        user_specified: &SinkDecouple,
627    ) -> Result<()> {
628        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
629            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
630                Some(commit_checkpoint_interval) => {
631                    let commit_checkpoint_interval = commit_checkpoint_interval
632                        .parse::<u64>()
633                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
634                    if matches!(user_specified, SinkDecouple::Disable)
635                        && commit_checkpoint_interval > 1
636                    {
637                        return Err(SinkError::Config(anyhow!(
638                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
639                        )));
640                    }
641                }
642                None => match user_specified {
643                    SinkDecouple::Default | SinkDecouple::Enable => {
644                        desc.properties.insert(
645                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
646                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
647                        );
648                    }
649                    SinkDecouple::Disable => {
650                        desc.properties.insert(
651                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
652                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
653                        );
654                    }
655                },
656            }
657        }
658        Ok(())
659    }
660
661    /// `user_specified` is the value of `sink_decouple` config.
662    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
663        match user_specified {
664            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
665            SinkDecouple::Disable => Ok(false),
666        }
667    }
668
669    fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
670        Ok(())
671    }
672
673    async fn validate(&self) -> Result<()>;
674    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
675
676    fn is_coordinated_sink(&self) -> bool {
677        false
678    }
679
680    #[expect(clippy::unused_async)]
681    async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
682        Err(SinkError::Coordinator(anyhow!("no coordinator")))
683    }
684}
685
686pub trait SinkLogReader: Send {
687    fn start_from(
688        &mut self,
689        start_offset: Option<u64>,
690    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
691    /// Emit the next item.
692    ///
693    /// The implementation should ensure that the future is cancellation safe.
694    fn next_item(
695        &mut self,
696    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
697
698    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
699    /// from the current offset.
700    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
701}
702
703impl<R: LogReader> SinkLogReader for &mut R {
704    fn next_item(
705        &mut self,
706    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
707        <R as LogReader>::next_item(*self)
708    }
709
710    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
711        <R as LogReader>::truncate(*self, offset)
712    }
713
714    fn start_from(
715        &mut self,
716        start_offset: Option<u64>,
717    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
718        <R as LogReader>::start_from(*self, start_offset)
719    }
720}
721
722#[async_trait]
723pub trait LogSinker: 'static + Send {
724    // Note: Please rebuild the log reader's read stream before consuming the log store,
725    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
726}
727pub type SinkCommittedEpochSubscriber = Arc<
728    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
729        + Send
730        + Sync
731        + 'static,
732>;
733
734#[async_trait]
735pub trait SinkCommitCoordinator {
736    /// Initialize the sink committer coordinator, return the log store rewind start offset.
737    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
738    /// After collecting the metadata from each sink writer, a coordinator will call `commit` with
739    /// the set of metadata. The metadata is serialized into bytes, because the metadata is expected
740    /// to be passed between different gRPC node, so in this general trait, the metadata is
741    /// serialized bytes.
742    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
743}
744
745pub struct DummySinkCommitCoordinator;
746
747#[async_trait]
748impl SinkCommitCoordinator for DummySinkCommitCoordinator {
749    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
750        Ok(None)
751    }
752
753    async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
754        Ok(())
755    }
756}
757
758impl SinkImpl {
759    pub fn new(mut param: SinkParam) -> Result<Self> {
760        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
761
762        // remove privatelink related properties if any
763        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
764
765        let sink_type = param
766            .properties
767            .get(CONNECTOR_TYPE_KEY)
768            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
769
770        let sink_type = sink_type.to_lowercase();
771        match_sink_name_str!(
772            sink_type.as_str(),
773            SinkType,
774            Ok(SinkType::try_from(param)?.into()),
775            |other| {
776                Err(SinkError::Config(anyhow!(
777                    "unsupported sink connector {}",
778                    other
779                )))
780            }
781        )
782    }
783
784    pub fn is_sink_into_table(&self) -> bool {
785        matches!(self, SinkImpl::Table(_))
786    }
787
788    pub fn is_blackhole(&self) -> bool {
789        matches!(self, SinkImpl::BlackHole(_))
790    }
791
792    pub fn is_coordinated_sink(&self) -> bool {
793        dispatch_sink!(self, sink, sink.is_coordinated_sink())
794    }
795}
796
797pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
798    SinkImpl::new(param)
799}
800
801macro_rules! def_sink_impl {
802    () => {
803        $crate::for_all_sinks! { def_sink_impl }
804    };
805    ({ $({ $variant_name:ident, $sink_type:ty }),* }) => {
806        #[derive(Debug)]
807        pub enum SinkImpl {
808            $(
809                $variant_name(Box<$sink_type>),
810            )*
811        }
812
813        $(
814            impl From<$sink_type> for SinkImpl {
815                fn from(sink: $sink_type) -> SinkImpl {
816                    SinkImpl::$variant_name(Box::new(sink))
817                }
818            }
819        )*
820    };
821}
822
823def_sink_impl!();
824
825pub type Result<T> = std::result::Result<T, SinkError>;
826
827#[derive(Error, Debug)]
828pub enum SinkError {
829    #[error("Kafka error: {0}")]
830    Kafka(#[from] rdkafka::error::KafkaError),
831    #[error("Kinesis error: {0}")]
832    Kinesis(
833        #[source]
834        #[backtrace]
835        anyhow::Error,
836    ),
837    #[error("Remote sink error: {0}")]
838    Remote(
839        #[source]
840        #[backtrace]
841        anyhow::Error,
842    ),
843    #[error("Encode error: {0}")]
844    Encode(String),
845    #[error("Avro error: {0}")]
846    Avro(#[from] apache_avro::Error),
847    #[error("Iceberg error: {0}")]
848    Iceberg(
849        #[source]
850        #[backtrace]
851        anyhow::Error,
852    ),
853    #[error("config error: {0}")]
854    Config(
855        #[source]
856        #[backtrace]
857        anyhow::Error,
858    ),
859    #[error("coordinator error: {0}")]
860    Coordinator(
861        #[source]
862        #[backtrace]
863        anyhow::Error,
864    ),
865    #[error("ClickHouse error: {0}")]
866    ClickHouse(String),
867    #[error("Redis error: {0}")]
868    Redis(String),
869    #[error("Mqtt error: {0}")]
870    Mqtt(
871        #[source]
872        #[backtrace]
873        anyhow::Error,
874    ),
875    #[error("Nats error: {0}")]
876    Nats(
877        #[source]
878        #[backtrace]
879        anyhow::Error,
880    ),
881    #[error("Google Pub/Sub error: {0}")]
882    GooglePubSub(
883        #[source]
884        #[backtrace]
885        anyhow::Error,
886    ),
887    #[error("Doris/Starrocks connect error: {0}")]
888    DorisStarrocksConnect(
889        #[source]
890        #[backtrace]
891        anyhow::Error,
892    ),
893    #[error("Doris error: {0}")]
894    Doris(String),
895    #[error("DeltaLake error: {0}")]
896    DeltaLake(
897        #[source]
898        #[backtrace]
899        anyhow::Error,
900    ),
901    #[error("ElasticSearch/OpenSearch error: {0}")]
902    ElasticSearchOpenSearch(
903        #[source]
904        #[backtrace]
905        anyhow::Error,
906    ),
907    #[error("Starrocks error: {0}")]
908    Starrocks(String),
909    #[error("File error: {0}")]
910    File(String),
911    #[error("Pulsar error: {0}")]
912    Pulsar(
913        #[source]
914        #[backtrace]
915        anyhow::Error,
916    ),
917    #[error(transparent)]
918    Internal(
919        #[from]
920        #[backtrace]
921        anyhow::Error,
922    ),
923    #[error("BigQuery error: {0}")]
924    BigQuery(
925        #[source]
926        #[backtrace]
927        anyhow::Error,
928    ),
929    #[error("DynamoDB error: {0}")]
930    DynamoDb(
931        #[source]
932        #[backtrace]
933        anyhow::Error,
934    ),
935    #[error("SQL Server error: {0}")]
936    SqlServer(
937        #[source]
938        #[backtrace]
939        anyhow::Error,
940    ),
941    #[error("Postgres error: {0}")]
942    Postgres(
943        #[source]
944        #[backtrace]
945        anyhow::Error,
946    ),
947    #[error(transparent)]
948    Connector(
949        #[from]
950        #[backtrace]
951        ConnectorError,
952    ),
953    #[error("Secret error: {0}")]
954    Secret(
955        #[from]
956        #[backtrace]
957        SecretError,
958    ),
959    #[error("Mongodb error: {0}")]
960    Mongodb(
961        #[source]
962        #[backtrace]
963        anyhow::Error,
964    ),
965}
966
967impl From<sea_orm::DbErr> for SinkError {
968    fn from(err: sea_orm::DbErr) -> Self {
969        SinkError::Iceberg(anyhow!(err))
970    }
971}
972
973impl From<OpendalError> for SinkError {
974    fn from(error: OpendalError) -> Self {
975        SinkError::File(error.to_report_string())
976    }
977}
978
979impl From<parquet::errors::ParquetError> for SinkError {
980    fn from(error: parquet::errors::ParquetError) -> Self {
981        SinkError::File(error.to_report_string())
982    }
983}
984
985impl From<ArrayError> for SinkError {
986    fn from(error: ArrayError) -> Self {
987        SinkError::File(error.to_report_string())
988    }
989}
990
991impl From<RpcError> for SinkError {
992    fn from(value: RpcError) -> Self {
993        SinkError::Remote(anyhow!(value))
994    }
995}
996
997impl From<ClickHouseError> for SinkError {
998    fn from(value: ClickHouseError) -> Self {
999        SinkError::ClickHouse(value.to_report_string())
1000    }
1001}
1002
1003impl From<DeltaTableError> for SinkError {
1004    fn from(value: DeltaTableError) -> Self {
1005        SinkError::DeltaLake(anyhow!(value))
1006    }
1007}
1008
1009impl From<RedisError> for SinkError {
1010    fn from(value: RedisError) -> Self {
1011        SinkError::Redis(value.to_report_string())
1012    }
1013}
1014
1015impl From<tiberius::error::Error> for SinkError {
1016    fn from(err: tiberius::error::Error) -> Self {
1017        SinkError::SqlServer(anyhow!(err))
1018    }
1019}
1020
1021impl From<::elasticsearch::Error> for SinkError {
1022    fn from(err: ::elasticsearch::Error) -> Self {
1023        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1024    }
1025}
1026
1027impl From<::opensearch::Error> for SinkError {
1028    fn from(err: ::opensearch::Error) -> Self {
1029        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1030    }
1031}
1032
1033impl From<tokio_postgres::Error> for SinkError {
1034    fn from(err: tokio_postgres::Error) -> Self {
1035        SinkError::Postgres(anyhow!(err))
1036    }
1037}