risingwave_connector/sink/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32pub mod log_store;
33pub mod mock_coordination_client;
34pub mod mongodb;
35pub mod mqtt;
36pub mod nats;
37pub mod postgres;
38pub mod pulsar;
39pub mod redis;
40pub mod remote;
41pub mod sqlserver;
42pub mod starrocks;
43pub mod test_sink;
44pub mod trivial;
45pub mod utils;
46pub mod writer;
47
48use std::collections::BTreeMap;
49use std::future::Future;
50use std::sync::{Arc, LazyLock};
51
52use ::clickhouse::error::Error as ClickHouseError;
53use ::deltalake::DeltaTableError;
54use ::redis::RedisError;
55use anyhow::anyhow;
56use async_trait::async_trait;
57use clickhouse::CLICKHOUSE_SINK;
58use decouple_checkpoint_log_sink::{
59    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
60    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
61};
62use deltalake::DELTALAKE_SINK;
63use futures::future::BoxFuture;
64use iceberg::ICEBERG_SINK;
65use opendal::Error as OpendalError;
66use prometheus::Registry;
67use risingwave_common::array::ArrayError;
68use risingwave_common::bitmap::Bitmap;
69use risingwave_common::catalog::{ColumnDesc, Field, Schema};
70use risingwave_common::config::StreamingConfig;
71use risingwave_common::hash::ActorId;
72use risingwave_common::metrics::{
73    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
74    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
75};
76use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
77use risingwave_common::secret::{LocalSecretManager, SecretError};
78use risingwave_common::session_config::sink_decouple::SinkDecouple;
79use risingwave_common::{
80    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
81    register_guarded_int_gauge_vec_with_registry,
82};
83use risingwave_pb::catalog::PbSinkType;
84use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
85use risingwave_rpc_client::MetaClient;
86use risingwave_rpc_client::error::RpcError;
87use sea_orm::DatabaseConnection;
88use starrocks::STARROCKS_SINK;
89use thiserror::Error;
90use thiserror_ext::AsReport;
91use tokio::sync::mpsc::UnboundedReceiver;
92pub use tracing;
93
94use self::catalog::{SinkFormatDesc, SinkType};
95use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
96use crate::error::ConnectorError;
97use crate::sink::catalog::desc::SinkDesc;
98use crate::sink::catalog::{SinkCatalog, SinkId};
99use crate::sink::file_sink::fs::FsSink;
100use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
101use crate::sink::writer::SinkWriter;
102
103const BOUNDED_CHANNEL_SIZE: usize = 16;
104#[macro_export]
105macro_rules! for_all_sinks {
106    ($macro:path $(, $arg:tt)*) => {
107        $macro! {
108            {
109                { Redis, $crate::sink::redis::RedisSink },
110                { Kafka, $crate::sink::kafka::KafkaSink },
111                { Pulsar, $crate::sink::pulsar::PulsarSink },
112                { BlackHole, $crate::sink::trivial::BlackHoleSink },
113                { Kinesis, $crate::sink::kinesis::KinesisSink },
114                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink },
115                { Iceberg, $crate::sink::iceberg::IcebergSink },
116                { Mqtt, $crate::sink::mqtt::MqttSink },
117                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink },
118                { Nats, $crate::sink::nats::NatsSink },
119                { Jdbc, $crate::sink::remote::JdbcSink },
120                // { ElasticSearchJava, $crate::sink::remote::ElasticSearchJavaSink },
121                // { OpensearchJava, $crate::sink::remote::OpenSearchJavaSink },
122                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink },
123                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink },
124                { Cassandra, $crate::sink::remote::CassandraSink },
125                { HttpJava, $crate::sink::remote::HttpJavaSink },
126                { Doris, $crate::sink::doris::DorisSink },
127                { Starrocks, $crate::sink::starrocks::StarrocksSink },
128                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>},
129
130                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>  },
131                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>},
132                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>},
133
134                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>  },
135                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>},
136                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
137                { BigQuery, $crate::sink::big_query::BigQuerySink },
138                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink },
139                { Mongodb, $crate::sink::mongodb::MongodbSink },
140                { SqlServer, $crate::sink::sqlserver::SqlServerSink },
141                { Postgres, $crate::sink::postgres::PostgresSink },
142
143                { Test, $crate::sink::test_sink::TestSink },
144                { Table, $crate::sink::trivial::TableSink }
145            }
146            $(,$arg)*
147        }
148    };
149}
150
151#[macro_export]
152macro_rules! dispatch_sink {
153    ({$({$variant_name:ident, $sink_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
154        use $crate::sink::SinkImpl;
155
156        match $impl {
157            $(
158                SinkImpl::$variant_name($sink) => $body,
159            )*
160        }
161    }};
162    ($impl:expr, $sink:ident, $body:expr) => {{
163        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
164    }};
165}
166
167#[macro_export]
168macro_rules! match_sink_name_str {
169    ({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
170        use $crate::sink::Sink;
171        match $name_str {
172            $(
173                <$sink_type>::SINK_NAME => {
174                    type $type_name = $sink_type;
175                    {
176                        $body
177                    }
178                },
179            )*
180            other => ($on_other_closure)(other),
181        }
182    }};
183    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
184        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
185    }};
186}
187
188pub const CONNECTOR_TYPE_KEY: &str = "connector";
189pub const SINK_TYPE_OPTION: &str = "type";
190pub const SINK_WITHOUT_BACKFILL: &str = "snapshot";
191pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
192pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
193pub const SINK_TYPE_UPSERT: &str = "upsert";
194pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
195
196#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct SinkParam {
198    pub sink_id: SinkId,
199    pub sink_name: String,
200    pub properties: BTreeMap<String, String>,
201    pub columns: Vec<ColumnDesc>,
202    pub downstream_pk: Vec<usize>,
203    pub sink_type: SinkType,
204    pub format_desc: Option<SinkFormatDesc>,
205    pub db_name: String,
206
207    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
208    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
209    ///
210    /// See also `gen_sink_plan`.
211    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
212    pub sink_from_name: String,
213}
214
215impl SinkParam {
216    pub fn from_proto(pb_param: PbSinkParam) -> Self {
217        let table_schema = pb_param.table_schema.expect("should contain table schema");
218        let format_desc = match pb_param.format_desc {
219            Some(f) => f.try_into().ok(),
220            None => {
221                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
222                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
223                match (connector, r#type) {
224                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
225                    _ => None,
226                }
227            }
228        };
229        Self {
230            sink_id: SinkId::from(pb_param.sink_id),
231            sink_name: pb_param.sink_name,
232            properties: pb_param.properties,
233            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
234            downstream_pk: table_schema
235                .pk_indices
236                .iter()
237                .map(|i| *i as usize)
238                .collect(),
239            sink_type: SinkType::from_proto(
240                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
241            ),
242            format_desc,
243            db_name: pb_param.db_name,
244            sink_from_name: pb_param.sink_from_name,
245        }
246    }
247
248    pub fn to_proto(&self) -> PbSinkParam {
249        PbSinkParam {
250            sink_id: self.sink_id.sink_id,
251            sink_name: self.sink_name.clone(),
252            properties: self.properties.clone(),
253            table_schema: Some(TableSchema {
254                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
255                pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
256            }),
257            sink_type: self.sink_type.to_proto().into(),
258            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
259            db_name: self.db_name.clone(),
260            sink_from_name: self.sink_from_name.clone(),
261        }
262    }
263
264    pub fn schema(&self) -> Schema {
265        Schema {
266            fields: self.columns.iter().map(Field::from).collect(),
267        }
268    }
269
270    // `SinkParams` should only be used when there is a secret context.
271    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
272    pub fn fill_secret_for_format_desc(
273        format_desc: Option<SinkFormatDesc>,
274    ) -> Result<Option<SinkFormatDesc>> {
275        match format_desc {
276            Some(mut format_desc) => {
277                format_desc.options = LocalSecretManager::global()
278                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
279                Ok(Some(format_desc))
280            }
281            None => Ok(None),
282        }
283    }
284
285    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
286    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
287        let columns = sink_catalog
288            .visible_columns()
289            .map(|col| col.column_desc.clone())
290            .collect();
291        let properties_with_secret = LocalSecretManager::global()
292            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
293        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
294        Ok(Self {
295            sink_id: sink_catalog.id,
296            sink_name: sink_catalog.name,
297            properties: properties_with_secret,
298            columns,
299            downstream_pk: sink_catalog.downstream_pk,
300            sink_type: sink_catalog.sink_type,
301            format_desc: format_desc_with_secret,
302            db_name: sink_catalog.db_name,
303            sink_from_name: sink_catalog.sink_from_name,
304        })
305    }
306}
307
308pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
309    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
310
311#[derive(Clone)]
312pub struct SinkMetrics {
313    pub sink_commit_duration: LabelGuardedHistogramVec<4>,
314    pub connector_sink_rows_received: LabelGuardedIntCounterVec<4>,
315
316    // Log store writer metrics
317    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec<3>,
318    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec<3>,
319    pub log_store_write_rows: LabelGuardedIntCounterVec<3>,
320
321    // Log store reader metrics
322    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec<4>,
323    pub log_store_read_rows: LabelGuardedIntCounterVec<4>,
324    pub log_store_read_bytes: LabelGuardedIntCounterVec<4>,
325    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec<4>,
326
327    // Iceberg metrics
328    pub iceberg_write_qps: LabelGuardedIntCounterVec<3>,
329    pub iceberg_write_latency: LabelGuardedHistogramVec<3>,
330    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec<3>,
331    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec<3>,
332    pub iceberg_partition_num: LabelGuardedIntGaugeVec<3>,
333    pub iceberg_write_bytes: LabelGuardedIntCounterVec<3>,
334}
335
336impl SinkMetrics {
337    pub fn new(registry: &Registry) -> Self {
338        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
339            "sink_commit_duration",
340            "Duration of commit op in sink",
341            &["actor_id", "connector", "sink_id", "sink_name"],
342            registry
343        )
344        .unwrap();
345
346        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
347            "connector_sink_rows_received",
348            "Number of rows received by sink",
349            &["actor_id", "connector_type", "sink_id", "sink_name"],
350            registry
351        )
352        .unwrap();
353
354        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
355            "log_store_first_write_epoch",
356            "The first write epoch of log store",
357            &["actor_id", "sink_id", "sink_name"],
358            registry
359        )
360        .unwrap();
361
362        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
363            "log_store_latest_write_epoch",
364            "The latest write epoch of log store",
365            &["actor_id", "sink_id", "sink_name"],
366            registry
367        )
368        .unwrap();
369
370        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
371            "log_store_write_rows",
372            "The write rate of rows",
373            &["actor_id", "sink_id", "sink_name"],
374            registry
375        )
376        .unwrap();
377
378        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
379            "log_store_latest_read_epoch",
380            "The latest read epoch of log store",
381            &["actor_id", "connector", "sink_id", "sink_name"],
382            registry
383        )
384        .unwrap();
385
386        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
387            "log_store_read_rows",
388            "The read rate of rows",
389            &["actor_id", "connector", "sink_id", "sink_name"],
390            registry
391        )
392        .unwrap();
393
394        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
395            "log_store_read_bytes",
396            "Total size of chunks read by log reader",
397            &["actor_id", "connector", "sink_id", "sink_name"],
398            registry
399        )
400        .unwrap();
401
402        let log_store_reader_wait_new_future_duration_ns =
403            register_guarded_int_counter_vec_with_registry!(
404                "log_store_reader_wait_new_future_duration_ns",
405                "Accumulated duration of LogReader to wait for next call to create future",
406                &["actor_id", "connector", "sink_id", "sink_name"],
407                registry
408            )
409            .unwrap();
410
411        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
412            "iceberg_write_qps",
413            "The qps of iceberg writer",
414            &["actor_id", "sink_id", "sink_name"],
415            registry
416        )
417        .unwrap();
418
419        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
420            "iceberg_write_latency",
421            "The latency of iceberg writer",
422            &["actor_id", "sink_id", "sink_name"],
423            registry
424        )
425        .unwrap();
426
427        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
428            "iceberg_rolling_unflushed_data_file",
429            "The unflushed data file count of iceberg rolling writer",
430            &["actor_id", "sink_id", "sink_name"],
431            registry
432        )
433        .unwrap();
434
435        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
436            "iceberg_position_delete_cache_num",
437            "The delete cache num of iceberg position delete writer",
438            &["actor_id", "sink_id", "sink_name"],
439            registry
440        )
441        .unwrap();
442
443        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
444            "iceberg_partition_num",
445            "The partition num of iceberg partition writer",
446            &["actor_id", "sink_id", "sink_name"],
447            registry
448        )
449        .unwrap();
450
451        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
452            "iceberg_write_bytes",
453            "The write bytes of iceberg writer",
454            &["actor_id", "sink_id", "sink_name"],
455            registry
456        )
457        .unwrap();
458
459        Self {
460            sink_commit_duration,
461            connector_sink_rows_received,
462            log_store_first_write_epoch,
463            log_store_latest_write_epoch,
464            log_store_write_rows,
465            log_store_latest_read_epoch,
466            log_store_read_rows,
467            log_store_read_bytes,
468            log_store_reader_wait_new_future_duration_ns,
469            iceberg_write_qps,
470            iceberg_write_latency,
471            iceberg_rolling_unflushed_data_file,
472            iceberg_position_delete_cache_num,
473            iceberg_partition_num,
474            iceberg_write_bytes,
475        }
476    }
477}
478
479#[derive(Clone)]
480pub struct SinkWriterParam {
481    // TODO(eric): deprecate executor_id
482    pub executor_id: u64,
483    pub vnode_bitmap: Option<Bitmap>,
484    pub meta_client: Option<SinkMetaClient>,
485    // The val has two effect:
486    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
487    // 2. The index of the extra partition value column.
488    // More detail of partition value column, see `PartitionComputeInfo`
489    pub extra_partition_col_idx: Option<usize>,
490
491    pub actor_id: ActorId,
492    pub sink_id: SinkId,
493    pub sink_name: String,
494    pub connector: String,
495    pub streaming_config: StreamingConfig,
496}
497
498#[derive(Clone)]
499pub struct SinkWriterMetrics {
500    pub sink_commit_duration: LabelGuardedHistogram<4>,
501    pub connector_sink_rows_received: LabelGuardedIntCounter<4>,
502}
503
504impl SinkWriterMetrics {
505    pub fn new(writer_param: &SinkWriterParam) -> Self {
506        let labels = [
507            &writer_param.actor_id.to_string(),
508            writer_param.connector.as_str(),
509            &writer_param.sink_id.to_string(),
510            writer_param.sink_name.as_str(),
511        ];
512        let sink_commit_duration = GLOBAL_SINK_METRICS
513            .sink_commit_duration
514            .with_guarded_label_values(&labels);
515        let connector_sink_rows_received = GLOBAL_SINK_METRICS
516            .connector_sink_rows_received
517            .with_guarded_label_values(&labels);
518        Self {
519            sink_commit_duration,
520            connector_sink_rows_received,
521        }
522    }
523
524    #[cfg(test)]
525    pub fn for_test() -> Self {
526        Self {
527            sink_commit_duration: LabelGuardedHistogram::test_histogram(),
528            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter(),
529        }
530    }
531}
532
533#[derive(Clone)]
534pub enum SinkMetaClient {
535    MetaClient(MetaClient),
536    MockMetaClient(MockMetaClient),
537}
538
539impl SinkMetaClient {
540    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
541        match self {
542            SinkMetaClient::MetaClient(meta_client) => {
543                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
544                    meta_client.sink_coordinate_client().await,
545                )
546            }
547            SinkMetaClient::MockMetaClient(mock_meta_client) => {
548                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
549                    mock_meta_client.sink_coordinate_client(),
550                )
551            }
552        }
553    }
554
555    pub async fn add_sink_fail_evet_log(
556        &self,
557        sink_id: u32,
558        sink_name: String,
559        connector: String,
560        error: String,
561    ) {
562        match self {
563            SinkMetaClient::MetaClient(meta_client) => {
564                match meta_client
565                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
566                    .await
567                {
568                    Ok(_) => {}
569                    Err(e) => {
570                        tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
571                    }
572                }
573            }
574            SinkMetaClient::MockMetaClient(_) => {}
575        }
576    }
577}
578
579impl SinkWriterParam {
580    pub fn for_test() -> Self {
581        SinkWriterParam {
582            executor_id: Default::default(),
583            vnode_bitmap: Default::default(),
584            meta_client: Default::default(),
585            extra_partition_col_idx: Default::default(),
586
587            actor_id: 1,
588            sink_id: SinkId::new(1),
589            sink_name: "test_sink".to_owned(),
590            connector: "test_connector".to_owned(),
591            streaming_config: StreamingConfig::default(),
592        }
593    }
594}
595
596fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
597    matches!(
598        sink_name,
599        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK
600    )
601}
602pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
603    const SINK_NAME: &'static str;
604    type LogSinker: LogSinker;
605    type Coordinator: SinkCommitCoordinator;
606
607    fn set_default_commit_checkpoint_interval(
608        desc: &mut SinkDesc,
609        user_specified: &SinkDecouple,
610    ) -> Result<()> {
611        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
612            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
613                Some(commit_checkpoint_interval) => {
614                    let commit_checkpoint_interval = commit_checkpoint_interval
615                        .parse::<u64>()
616                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
617                    if matches!(user_specified, SinkDecouple::Disable)
618                        && commit_checkpoint_interval > 1
619                    {
620                        return Err(SinkError::Config(anyhow!(
621                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
622                        )));
623                    }
624                }
625                None => match user_specified {
626                    SinkDecouple::Default | SinkDecouple::Enable => {
627                        desc.properties.insert(
628                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
629                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
630                        );
631                    }
632                    SinkDecouple::Disable => {
633                        desc.properties.insert(
634                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
635                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
636                        );
637                    }
638                },
639            }
640        }
641        Ok(())
642    }
643
644    /// `user_specified` is the value of `sink_decouple` config.
645    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
646        match user_specified {
647            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
648            SinkDecouple::Disable => Ok(false),
649        }
650    }
651
652    async fn validate(&self) -> Result<()>;
653    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
654
655    fn is_coordinated_sink(&self) -> bool {
656        false
657    }
658
659    #[expect(clippy::unused_async)]
660    async fn new_coordinator(&self, _db: DatabaseConnection) -> Result<Self::Coordinator> {
661        Err(SinkError::Coordinator(anyhow!("no coordinator")))
662    }
663}
664
665pub trait SinkLogReader: Send {
666    fn start_from(
667        &mut self,
668        start_offset: Option<u64>,
669    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
670    /// Emit the next item.
671    ///
672    /// The implementation should ensure that the future is cancellation safe.
673    fn next_item(
674        &mut self,
675    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
676
677    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
678    /// from the current offset.
679    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
680}
681
682impl<R: LogReader> SinkLogReader for &mut R {
683    fn next_item(
684        &mut self,
685    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
686        <R as LogReader>::next_item(*self)
687    }
688
689    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
690        <R as LogReader>::truncate(*self, offset)
691    }
692
693    fn start_from(
694        &mut self,
695        start_offset: Option<u64>,
696    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
697        <R as LogReader>::start_from(*self, start_offset)
698    }
699}
700
701#[async_trait]
702pub trait LogSinker: 'static + Send {
703    // Note: Please rebuild the log reader's read stream before consuming the log store,
704    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
705}
706pub type SinkCommittedEpochSubscriber = Arc<
707    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
708        + Send
709        + Sync
710        + 'static,
711>;
712
713#[async_trait]
714pub trait SinkCommitCoordinator {
715    /// Initialize the sink committer coordinator, return the log store rewind start offset.
716    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
717    /// After collecting the metadata from each sink writer, a coordinator will call `commit` with
718    /// the set of metadata. The metadata is serialized into bytes, because the metadata is expected
719    /// to be passed between different gRPC node, so in this general trait, the metadata is
720    /// serialized bytes.
721    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
722}
723
724pub struct DummySinkCommitCoordinator;
725
726#[async_trait]
727impl SinkCommitCoordinator for DummySinkCommitCoordinator {
728    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
729        Ok(None)
730    }
731
732    async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
733        Ok(())
734    }
735}
736
737impl SinkImpl {
738    pub fn new(mut param: SinkParam) -> Result<Self> {
739        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
740
741        // remove privatelink related properties if any
742        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
743
744        let sink_type = param
745            .properties
746            .get(CONNECTOR_TYPE_KEY)
747            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
748
749        let sink_type = sink_type.to_lowercase();
750        match_sink_name_str!(
751            sink_type.as_str(),
752            SinkType,
753            Ok(SinkType::try_from(param)?.into()),
754            |other| {
755                Err(SinkError::Config(anyhow!(
756                    "unsupported sink connector {}",
757                    other
758                )))
759            }
760        )
761    }
762
763    pub fn is_sink_into_table(&self) -> bool {
764        matches!(self, SinkImpl::Table(_))
765    }
766
767    pub fn is_blackhole(&self) -> bool {
768        matches!(self, SinkImpl::BlackHole(_))
769    }
770
771    pub fn is_coordinated_sink(&self) -> bool {
772        dispatch_sink!(self, sink, sink.is_coordinated_sink())
773    }
774}
775
776pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
777    SinkImpl::new(param)
778}
779
780macro_rules! def_sink_impl {
781    () => {
782        $crate::for_all_sinks! { def_sink_impl }
783    };
784    ({ $({ $variant_name:ident, $sink_type:ty }),* }) => {
785        #[derive(Debug)]
786        pub enum SinkImpl {
787            $(
788                $variant_name(Box<$sink_type>),
789            )*
790        }
791
792        $(
793            impl From<$sink_type> for SinkImpl {
794                fn from(sink: $sink_type) -> SinkImpl {
795                    SinkImpl::$variant_name(Box::new(sink))
796                }
797            }
798        )*
799    };
800}
801
802def_sink_impl!();
803
804pub type Result<T> = std::result::Result<T, SinkError>;
805
806#[derive(Error, Debug)]
807pub enum SinkError {
808    #[error("Kafka error: {0}")]
809    Kafka(#[from] rdkafka::error::KafkaError),
810    #[error("Kinesis error: {0}")]
811    Kinesis(
812        #[source]
813        #[backtrace]
814        anyhow::Error,
815    ),
816    #[error("Remote sink error: {0}")]
817    Remote(
818        #[source]
819        #[backtrace]
820        anyhow::Error,
821    ),
822    #[error("Encode error: {0}")]
823    Encode(String),
824    #[error("Avro error: {0}")]
825    Avro(#[from] apache_avro::Error),
826    #[error("Iceberg error: {0}")]
827    Iceberg(
828        #[source]
829        #[backtrace]
830        anyhow::Error,
831    ),
832    #[error("config error: {0}")]
833    Config(
834        #[source]
835        #[backtrace]
836        anyhow::Error,
837    ),
838    #[error("coordinator error: {0}")]
839    Coordinator(
840        #[source]
841        #[backtrace]
842        anyhow::Error,
843    ),
844    #[error("ClickHouse error: {0}")]
845    ClickHouse(String),
846    #[error("Redis error: {0}")]
847    Redis(String),
848    #[error("Mqtt error: {0}")]
849    Mqtt(
850        #[source]
851        #[backtrace]
852        anyhow::Error,
853    ),
854    #[error("Nats error: {0}")]
855    Nats(
856        #[source]
857        #[backtrace]
858        anyhow::Error,
859    ),
860    #[error("Google Pub/Sub error: {0}")]
861    GooglePubSub(
862        #[source]
863        #[backtrace]
864        anyhow::Error,
865    ),
866    #[error("Doris/Starrocks connect error: {0}")]
867    DorisStarrocksConnect(
868        #[source]
869        #[backtrace]
870        anyhow::Error,
871    ),
872    #[error("Doris error: {0}")]
873    Doris(String),
874    #[error("DeltaLake error: {0}")]
875    DeltaLake(
876        #[source]
877        #[backtrace]
878        anyhow::Error,
879    ),
880    #[error("ElasticSearch/OpenSearch error: {0}")]
881    ElasticSearchOpenSearch(
882        #[source]
883        #[backtrace]
884        anyhow::Error,
885    ),
886    #[error("Starrocks error: {0}")]
887    Starrocks(String),
888    #[error("File error: {0}")]
889    File(String),
890    #[error("Pulsar error: {0}")]
891    Pulsar(
892        #[source]
893        #[backtrace]
894        anyhow::Error,
895    ),
896    #[error(transparent)]
897    Internal(
898        #[from]
899        #[backtrace]
900        anyhow::Error,
901    ),
902    #[error("BigQuery error: {0}")]
903    BigQuery(
904        #[source]
905        #[backtrace]
906        anyhow::Error,
907    ),
908    #[error("DynamoDB error: {0}")]
909    DynamoDb(
910        #[source]
911        #[backtrace]
912        anyhow::Error,
913    ),
914    #[error("SQL Server error: {0}")]
915    SqlServer(
916        #[source]
917        #[backtrace]
918        anyhow::Error,
919    ),
920    #[error("Postgres error: {0}")]
921    Postgres(
922        #[source]
923        #[backtrace]
924        anyhow::Error,
925    ),
926    #[error(transparent)]
927    Connector(
928        #[from]
929        #[backtrace]
930        ConnectorError,
931    ),
932    #[error("Secret error: {0}")]
933    Secret(
934        #[from]
935        #[backtrace]
936        SecretError,
937    ),
938    #[error("Mongodb error: {0}")]
939    Mongodb(
940        #[source]
941        #[backtrace]
942        anyhow::Error,
943    ),
944}
945
946impl From<sea_orm::DbErr> for SinkError {
947    fn from(err: sea_orm::DbErr) -> Self {
948        SinkError::Iceberg(anyhow!(err))
949    }
950}
951
952impl From<OpendalError> for SinkError {
953    fn from(error: OpendalError) -> Self {
954        SinkError::File(error.to_report_string())
955    }
956}
957
958impl From<parquet::errors::ParquetError> for SinkError {
959    fn from(error: parquet::errors::ParquetError) -> Self {
960        SinkError::File(error.to_report_string())
961    }
962}
963
964impl From<ArrayError> for SinkError {
965    fn from(error: ArrayError) -> Self {
966        SinkError::File(error.to_report_string())
967    }
968}
969
970impl From<RpcError> for SinkError {
971    fn from(value: RpcError) -> Self {
972        SinkError::Remote(anyhow!(value))
973    }
974}
975
976impl From<ClickHouseError> for SinkError {
977    fn from(value: ClickHouseError) -> Self {
978        SinkError::ClickHouse(value.to_report_string())
979    }
980}
981
982impl From<DeltaTableError> for SinkError {
983    fn from(value: DeltaTableError) -> Self {
984        SinkError::DeltaLake(anyhow!(value))
985    }
986}
987
988impl From<RedisError> for SinkError {
989    fn from(value: RedisError) -> Self {
990        SinkError::Redis(value.to_report_string())
991    }
992}
993
994impl From<tiberius::error::Error> for SinkError {
995    fn from(err: tiberius::error::Error) -> Self {
996        SinkError::SqlServer(anyhow!(err))
997    }
998}
999
1000impl From<::elasticsearch::Error> for SinkError {
1001    fn from(err: ::elasticsearch::Error) -> Self {
1002        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1003    }
1004}
1005
1006impl From<::opensearch::Error> for SinkError {
1007    fn from(err: ::opensearch::Error) -> Self {
1008        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1009    }
1010}
1011
1012impl From<tokio_postgres::Error> for SinkError {
1013    fn from(err: tokio_postgres::Error) -> Self {
1014        SinkError::Postgres(anyhow!(err))
1015    }
1016}