risingwave_connector/sink/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32use risingwave_common::bail;
33pub mod log_store;
34pub mod mock_coordination_client;
35pub mod mongodb;
36pub mod mqtt;
37pub mod nats;
38pub mod postgres;
39pub mod pulsar;
40pub mod redis;
41pub mod remote;
42pub mod sqlserver;
43pub mod starrocks;
44pub mod test_sink;
45pub mod trivial;
46pub mod utils;
47pub mod writer;
48pub mod prelude {
49    pub use crate::sink::{
50        Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkError,
51        SinkParam, SinkWriterParam,
52    };
53}
54
55use std::collections::BTreeMap;
56use std::future::Future;
57use std::sync::{Arc, LazyLock};
58
59use ::clickhouse::error::Error as ClickHouseError;
60use ::redis::RedisError;
61use anyhow::anyhow;
62use async_trait::async_trait;
63use clickhouse::CLICKHOUSE_SINK;
64use decouple_checkpoint_log_sink::{
65    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
66    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
67};
68use deltalake::DELTALAKE_SINK;
69use futures::future::BoxFuture;
70use iceberg::ICEBERG_SINK;
71use opendal::Error as OpendalError;
72use prometheus::Registry;
73use risingwave_common::array::ArrayError;
74use risingwave_common::bitmap::Bitmap;
75use risingwave_common::catalog::{ColumnDesc, Field, Schema};
76use risingwave_common::config::StreamingConfig;
77use risingwave_common::hash::ActorId;
78use risingwave_common::metrics::{
79    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
80    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
81};
82use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
83use risingwave_common::secret::{LocalSecretManager, SecretError};
84use risingwave_common::session_config::sink_decouple::SinkDecouple;
85use risingwave_common::{
86    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
87    register_guarded_int_gauge_vec_with_registry,
88};
89use risingwave_pb::catalog::PbSinkType;
90use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
91use risingwave_rpc_client::MetaClient;
92use risingwave_rpc_client::error::RpcError;
93use sea_orm::DatabaseConnection;
94use starrocks::STARROCKS_SINK;
95use thiserror::Error;
96use thiserror_ext::AsReport;
97use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
98pub use tracing;
99
100use self::catalog::{SinkFormatDesc, SinkType};
101use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
102use crate::WithPropertiesExt;
103use crate::connector_common::IcebergSinkCompactionUpdate;
104use crate::error::{ConnectorError, ConnectorResult};
105use crate::sink::catalog::desc::SinkDesc;
106use crate::sink::catalog::{SinkCatalog, SinkId};
107use crate::sink::file_sink::fs::FsSink;
108use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
109use crate::sink::writer::SinkWriter;
110
111const BOUNDED_CHANNEL_SIZE: usize = 16;
112#[macro_export]
113macro_rules! for_all_sinks {
114    ($macro:path $(, $arg:tt)*) => {
115        $macro! {
116            {
117                { Redis, $crate::sink::redis::RedisSink },
118                { Kafka, $crate::sink::kafka::KafkaSink },
119                { Pulsar, $crate::sink::pulsar::PulsarSink },
120                { BlackHole, $crate::sink::trivial::BlackHoleSink },
121                { Kinesis, $crate::sink::kinesis::KinesisSink },
122                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink },
123                { Iceberg, $crate::sink::iceberg::IcebergSink },
124                { Mqtt, $crate::sink::mqtt::MqttSink },
125                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink },
126                { Nats, $crate::sink::nats::NatsSink },
127                { Jdbc, $crate::sink::remote::JdbcSink },
128                // { ElasticSearchJava, $crate::sink::remote::ElasticSearchJavaSink },
129                // { OpensearchJava, $crate::sink::remote::OpenSearchJavaSink },
130                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink },
131                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink },
132                { Cassandra, $crate::sink::remote::CassandraSink },
133                { Doris, $crate::sink::doris::DorisSink },
134                { Starrocks, $crate::sink::starrocks::StarrocksSink },
135                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>},
136
137                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>  },
138                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>},
139                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>},
140
141                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>  },
142                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>},
143                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink },
144                { BigQuery, $crate::sink::big_query::BigQuerySink },
145                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink },
146                { Mongodb, $crate::sink::mongodb::MongodbSink },
147                { SqlServer, $crate::sink::sqlserver::SqlServerSink },
148                { Postgres, $crate::sink::postgres::PostgresSink },
149
150                { Test, $crate::sink::test_sink::TestSink },
151                { Table, $crate::sink::trivial::TableSink }
152            }
153            $(,$arg)*
154        }
155    };
156}
157
158#[macro_export]
159macro_rules! dispatch_sink {
160    ({$({$variant_name:ident, $sink_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
161        use $crate::sink::SinkImpl;
162
163        match $impl {
164            $(
165                SinkImpl::$variant_name($sink) => $body,
166            )*
167        }
168    }};
169    ($impl:expr, $sink:ident, $body:expr) => {{
170        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
171    }};
172}
173
174#[macro_export]
175macro_rules! match_sink_name_str {
176    ({$({$variant_name:ident, $sink_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
177        use $crate::sink::Sink;
178        match $name_str {
179            $(
180                <$sink_type>::SINK_NAME => {
181                    type $type_name = $sink_type;
182                    {
183                        $body
184                    }
185                },
186            )*
187            other => ($on_other_closure)(other),
188        }
189    }};
190    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
191        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
192    }};
193}
194
195pub const CONNECTOR_TYPE_KEY: &str = "connector";
196pub const SINK_TYPE_OPTION: &str = "type";
197/// `snapshot = false` corresponds to [`risingwave_pb::stream_plan::StreamScanType::UpstreamOnly`]
198pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
199pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
200pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
201pub const SINK_TYPE_UPSERT: &str = "upsert";
202pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
203
204#[derive(Debug, Clone, PartialEq, Eq)]
205pub struct SinkParam {
206    pub sink_id: SinkId,
207    pub sink_name: String,
208    pub properties: BTreeMap<String, String>,
209    pub columns: Vec<ColumnDesc>,
210    pub downstream_pk: Vec<usize>,
211    pub sink_type: SinkType,
212    pub format_desc: Option<SinkFormatDesc>,
213    pub db_name: String,
214
215    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
216    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
217    ///
218    /// See also `gen_sink_plan`.
219    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
220    pub sink_from_name: String,
221}
222
223impl SinkParam {
224    pub fn from_proto(pb_param: PbSinkParam) -> Self {
225        let table_schema = pb_param.table_schema.expect("should contain table schema");
226        let format_desc = match pb_param.format_desc {
227            Some(f) => f.try_into().ok(),
228            None => {
229                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
230                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
231                match (connector, r#type) {
232                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
233                    _ => None,
234                }
235            }
236        };
237        Self {
238            sink_id: SinkId::from(pb_param.sink_id),
239            sink_name: pb_param.sink_name,
240            properties: pb_param.properties,
241            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
242            downstream_pk: table_schema
243                .pk_indices
244                .iter()
245                .map(|i| *i as usize)
246                .collect(),
247            sink_type: SinkType::from_proto(
248                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
249            ),
250            format_desc,
251            db_name: pb_param.db_name,
252            sink_from_name: pb_param.sink_from_name,
253        }
254    }
255
256    pub fn to_proto(&self) -> PbSinkParam {
257        PbSinkParam {
258            sink_id: self.sink_id.sink_id,
259            sink_name: self.sink_name.clone(),
260            properties: self.properties.clone(),
261            table_schema: Some(TableSchema {
262                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
263                pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
264            }),
265            sink_type: self.sink_type.to_proto().into(),
266            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
267            db_name: self.db_name.clone(),
268            sink_from_name: self.sink_from_name.clone(),
269        }
270    }
271
272    pub fn schema(&self) -> Schema {
273        Schema {
274            fields: self.columns.iter().map(Field::from).collect(),
275        }
276    }
277
278    // `SinkParams` should only be used when there is a secret context.
279    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
280    pub fn fill_secret_for_format_desc(
281        format_desc: Option<SinkFormatDesc>,
282    ) -> Result<Option<SinkFormatDesc>> {
283        match format_desc {
284            Some(mut format_desc) => {
285                format_desc.options = LocalSecretManager::global()
286                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
287                Ok(Some(format_desc))
288            }
289            None => Ok(None),
290        }
291    }
292
293    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
294    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
295        let columns = sink_catalog
296            .visible_columns()
297            .map(|col| col.column_desc.clone())
298            .collect();
299        let properties_with_secret = LocalSecretManager::global()
300            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
301        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
302        Ok(Self {
303            sink_id: sink_catalog.id,
304            sink_name: sink_catalog.name,
305            properties: properties_with_secret,
306            columns,
307            downstream_pk: sink_catalog.downstream_pk,
308            sink_type: sink_catalog.sink_type,
309            format_desc: format_desc_with_secret,
310            db_name: sink_catalog.db_name,
311            sink_from_name: sink_catalog.sink_from_name,
312        })
313    }
314}
315
316pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
317    use crate::enforce_secret::EnforceSecret;
318
319    let connector = props
320        .get_connector()
321        .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
322    let key_iter = props.key_iter();
323    match_sink_name_str!(
324        connector.as_str(),
325        PropType,
326        PropType::enforce_secret(key_iter),
327        |other| bail!("connector '{}' is not supported", other)
328    )
329}
330
331pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
332    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
333
334#[derive(Clone)]
335pub struct SinkMetrics {
336    pub sink_commit_duration: LabelGuardedHistogramVec,
337    pub connector_sink_rows_received: LabelGuardedIntCounterVec,
338
339    // Log store writer metrics
340    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
341    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
342    pub log_store_write_rows: LabelGuardedIntCounterVec,
343
344    // Log store reader metrics
345    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
346    pub log_store_read_rows: LabelGuardedIntCounterVec,
347    pub log_store_read_bytes: LabelGuardedIntCounterVec,
348    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
349
350    // Iceberg metrics
351    pub iceberg_write_qps: LabelGuardedIntCounterVec,
352    pub iceberg_write_latency: LabelGuardedHistogramVec,
353    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
354    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
355    pub iceberg_partition_num: LabelGuardedIntGaugeVec,
356    pub iceberg_write_bytes: LabelGuardedIntCounterVec,
357}
358
359impl SinkMetrics {
360    pub fn new(registry: &Registry) -> Self {
361        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
362            "sink_commit_duration",
363            "Duration of commit op in sink",
364            &["actor_id", "connector", "sink_id", "sink_name"],
365            registry
366        )
367        .unwrap();
368
369        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
370            "connector_sink_rows_received",
371            "Number of rows received by sink",
372            &["actor_id", "connector_type", "sink_id", "sink_name"],
373            registry
374        )
375        .unwrap();
376
377        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
378            "log_store_first_write_epoch",
379            "The first write epoch of log store",
380            &["actor_id", "sink_id", "sink_name"],
381            registry
382        )
383        .unwrap();
384
385        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
386            "log_store_latest_write_epoch",
387            "The latest write epoch of log store",
388            &["actor_id", "sink_id", "sink_name"],
389            registry
390        )
391        .unwrap();
392
393        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
394            "log_store_write_rows",
395            "The write rate of rows",
396            &["actor_id", "sink_id", "sink_name"],
397            registry
398        )
399        .unwrap();
400
401        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
402            "log_store_latest_read_epoch",
403            "The latest read epoch of log store",
404            &["actor_id", "connector", "sink_id", "sink_name"],
405            registry
406        )
407        .unwrap();
408
409        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
410            "log_store_read_rows",
411            "The read rate of rows",
412            &["actor_id", "connector", "sink_id", "sink_name"],
413            registry
414        )
415        .unwrap();
416
417        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
418            "log_store_read_bytes",
419            "Total size of chunks read by log reader",
420            &["actor_id", "connector", "sink_id", "sink_name"],
421            registry
422        )
423        .unwrap();
424
425        let log_store_reader_wait_new_future_duration_ns =
426            register_guarded_int_counter_vec_with_registry!(
427                "log_store_reader_wait_new_future_duration_ns",
428                "Accumulated duration of LogReader to wait for next call to create future",
429                &["actor_id", "connector", "sink_id", "sink_name"],
430                registry
431            )
432            .unwrap();
433
434        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
435            "iceberg_write_qps",
436            "The qps of iceberg writer",
437            &["actor_id", "sink_id", "sink_name"],
438            registry
439        )
440        .unwrap();
441
442        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
443            "iceberg_write_latency",
444            "The latency of iceberg writer",
445            &["actor_id", "sink_id", "sink_name"],
446            registry
447        )
448        .unwrap();
449
450        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
451            "iceberg_rolling_unflushed_data_file",
452            "The unflushed data file count of iceberg rolling writer",
453            &["actor_id", "sink_id", "sink_name"],
454            registry
455        )
456        .unwrap();
457
458        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
459            "iceberg_position_delete_cache_num",
460            "The delete cache num of iceberg position delete writer",
461            &["actor_id", "sink_id", "sink_name"],
462            registry
463        )
464        .unwrap();
465
466        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
467            "iceberg_partition_num",
468            "The partition num of iceberg partition writer",
469            &["actor_id", "sink_id", "sink_name"],
470            registry
471        )
472        .unwrap();
473
474        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
475            "iceberg_write_bytes",
476            "The write bytes of iceberg writer",
477            &["actor_id", "sink_id", "sink_name"],
478            registry
479        )
480        .unwrap();
481
482        Self {
483            sink_commit_duration,
484            connector_sink_rows_received,
485            log_store_first_write_epoch,
486            log_store_latest_write_epoch,
487            log_store_write_rows,
488            log_store_latest_read_epoch,
489            log_store_read_rows,
490            log_store_read_bytes,
491            log_store_reader_wait_new_future_duration_ns,
492            iceberg_write_qps,
493            iceberg_write_latency,
494            iceberg_rolling_unflushed_data_file,
495            iceberg_position_delete_cache_num,
496            iceberg_partition_num,
497            iceberg_write_bytes,
498        }
499    }
500}
501
502#[derive(Clone)]
503pub struct SinkWriterParam {
504    // TODO(eric): deprecate executor_id
505    pub executor_id: u64,
506    pub vnode_bitmap: Option<Bitmap>,
507    pub meta_client: Option<SinkMetaClient>,
508    // The val has two effect:
509    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
510    // 2. The index of the extra partition value column.
511    // More detail of partition value column, see `PartitionComputeInfo`
512    pub extra_partition_col_idx: Option<usize>,
513
514    pub actor_id: ActorId,
515    pub sink_id: SinkId,
516    pub sink_name: String,
517    pub connector: String,
518    pub streaming_config: StreamingConfig,
519}
520
521#[derive(Clone)]
522pub struct SinkWriterMetrics {
523    pub sink_commit_duration: LabelGuardedHistogram,
524    pub connector_sink_rows_received: LabelGuardedIntCounter,
525}
526
527impl SinkWriterMetrics {
528    pub fn new(writer_param: &SinkWriterParam) -> Self {
529        let labels = [
530            &writer_param.actor_id.to_string(),
531            writer_param.connector.as_str(),
532            &writer_param.sink_id.to_string(),
533            writer_param.sink_name.as_str(),
534        ];
535        let sink_commit_duration = GLOBAL_SINK_METRICS
536            .sink_commit_duration
537            .with_guarded_label_values(&labels);
538        let connector_sink_rows_received = GLOBAL_SINK_METRICS
539            .connector_sink_rows_received
540            .with_guarded_label_values(&labels);
541        Self {
542            sink_commit_duration,
543            connector_sink_rows_received,
544        }
545    }
546
547    #[cfg(test)]
548    pub fn for_test() -> Self {
549        Self {
550            sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
551            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
552        }
553    }
554}
555
556#[derive(Clone)]
557pub enum SinkMetaClient {
558    MetaClient(MetaClient),
559    MockMetaClient(MockMetaClient),
560}
561
562impl SinkMetaClient {
563    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
564        match self {
565            SinkMetaClient::MetaClient(meta_client) => {
566                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
567                    meta_client.sink_coordinate_client().await,
568                )
569            }
570            SinkMetaClient::MockMetaClient(mock_meta_client) => {
571                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
572                    mock_meta_client.sink_coordinate_client(),
573                )
574            }
575        }
576    }
577
578    pub async fn add_sink_fail_evet_log(
579        &self,
580        sink_id: u32,
581        sink_name: String,
582        connector: String,
583        error: String,
584    ) {
585        match self {
586            SinkMetaClient::MetaClient(meta_client) => {
587                match meta_client
588                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
589                    .await
590                {
591                    Ok(_) => {}
592                    Err(e) => {
593                        tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
594                    }
595                }
596            }
597            SinkMetaClient::MockMetaClient(_) => {}
598        }
599    }
600}
601
602impl SinkWriterParam {
603    pub fn for_test() -> Self {
604        SinkWriterParam {
605            executor_id: Default::default(),
606            vnode_bitmap: Default::default(),
607            meta_client: Default::default(),
608            extra_partition_col_idx: Default::default(),
609
610            actor_id: 1,
611            sink_id: SinkId::new(1),
612            sink_name: "test_sink".to_owned(),
613            connector: "test_connector".to_owned(),
614            streaming_config: StreamingConfig::default(),
615        }
616    }
617}
618
619fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
620    matches!(
621        sink_name,
622        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK
623    )
624}
625pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
626    const SINK_NAME: &'static str;
627    const SINK_ALTER_CONFIG_LIST: &'static [&'static str] = &[];
628    type LogSinker: LogSinker;
629    type Coordinator: SinkCommitCoordinator;
630
631    fn set_default_commit_checkpoint_interval(
632        desc: &mut SinkDesc,
633        user_specified: &SinkDecouple,
634    ) -> Result<()> {
635        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
636            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
637                Some(commit_checkpoint_interval) => {
638                    let commit_checkpoint_interval = commit_checkpoint_interval
639                        .parse::<u64>()
640                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
641                    if matches!(user_specified, SinkDecouple::Disable)
642                        && commit_checkpoint_interval > 1
643                    {
644                        return Err(SinkError::Config(anyhow!(
645                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
646                        )));
647                    }
648                }
649                None => match user_specified {
650                    SinkDecouple::Default | SinkDecouple::Enable => {
651                        desc.properties.insert(
652                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
653                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
654                        );
655                    }
656                    SinkDecouple::Disable => {
657                        desc.properties.insert(
658                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
659                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
660                        );
661                    }
662                },
663            }
664        }
665        Ok(())
666    }
667
668    /// `user_specified` is the value of `sink_decouple` config.
669    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
670        match user_specified {
671            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
672            SinkDecouple::Disable => Ok(false),
673        }
674    }
675
676    fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
677        Ok(())
678    }
679
680    async fn validate(&self) -> Result<()>;
681    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
682
683    fn is_coordinated_sink(&self) -> bool {
684        false
685    }
686
687    #[expect(clippy::unused_async)]
688    async fn new_coordinator(
689        &self,
690        _db: DatabaseConnection,
691        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
692    ) -> Result<Self::Coordinator> {
693        Err(SinkError::Coordinator(anyhow!("no coordinator")))
694    }
695}
696
697pub trait SinkLogReader: Send {
698    fn start_from(
699        &mut self,
700        start_offset: Option<u64>,
701    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
702    /// Emit the next item.
703    ///
704    /// The implementation should ensure that the future is cancellation safe.
705    fn next_item(
706        &mut self,
707    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
708
709    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
710    /// from the current offset.
711    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
712}
713
714impl<R: LogReader> SinkLogReader for &mut R {
715    fn next_item(
716        &mut self,
717    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
718        <R as LogReader>::next_item(*self)
719    }
720
721    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
722        <R as LogReader>::truncate(*self, offset)
723    }
724
725    fn start_from(
726        &mut self,
727        start_offset: Option<u64>,
728    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
729        <R as LogReader>::start_from(*self, start_offset)
730    }
731}
732
733#[async_trait]
734pub trait LogSinker: 'static + Send {
735    // Note: Please rebuild the log reader's read stream before consuming the log store,
736    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
737}
738pub type SinkCommittedEpochSubscriber = Arc<
739    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
740        + Send
741        + Sync
742        + 'static,
743>;
744
745#[async_trait]
746pub trait SinkCommitCoordinator {
747    /// Initialize the sink committer coordinator, return the log store rewind start offset.
748    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
749    /// After collecting the metadata from each sink writer, a coordinator will call `commit` with
750    /// the set of metadata. The metadata is serialized into bytes, because the metadata is expected
751    /// to be passed between different gRPC node, so in this general trait, the metadata is
752    /// serialized bytes.
753    async fn commit(&mut self, epoch: u64, metadata: Vec<SinkMetadata>) -> Result<()>;
754}
755
756pub struct DummySinkCommitCoordinator;
757
758#[async_trait]
759impl SinkCommitCoordinator for DummySinkCommitCoordinator {
760    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
761        Ok(None)
762    }
763
764    async fn commit(&mut self, _epoch: u64, _metadata: Vec<SinkMetadata>) -> Result<()> {
765        Ok(())
766    }
767}
768
769impl SinkImpl {
770    pub fn new(mut param: SinkParam) -> Result<Self> {
771        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
772
773        // remove privatelink related properties if any
774        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
775
776        let sink_type = param
777            .properties
778            .get(CONNECTOR_TYPE_KEY)
779            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
780
781        let sink_type = sink_type.to_lowercase();
782        match_sink_name_str!(
783            sink_type.as_str(),
784            SinkType,
785            Ok(SinkType::try_from(param)?.into()),
786            |other| {
787                Err(SinkError::Config(anyhow!(
788                    "unsupported sink connector {}",
789                    other
790                )))
791            }
792        )
793    }
794
795    pub fn is_sink_into_table(&self) -> bool {
796        matches!(self, SinkImpl::Table(_))
797    }
798
799    pub fn is_blackhole(&self) -> bool {
800        matches!(self, SinkImpl::BlackHole(_))
801    }
802
803    pub fn is_coordinated_sink(&self) -> bool {
804        dispatch_sink!(self, sink, sink.is_coordinated_sink())
805    }
806}
807
808pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
809    SinkImpl::new(param)
810}
811
812macro_rules! def_sink_impl {
813    () => {
814        $crate::for_all_sinks! { def_sink_impl }
815    };
816    ({ $({ $variant_name:ident, $sink_type:ty }),* }) => {
817        #[derive(Debug)]
818        pub enum SinkImpl {
819            $(
820                $variant_name(Box<$sink_type>),
821            )*
822        }
823
824        $(
825            impl From<$sink_type> for SinkImpl {
826                fn from(sink: $sink_type) -> SinkImpl {
827                    SinkImpl::$variant_name(Box::new(sink))
828                }
829            }
830        )*
831    };
832}
833
834def_sink_impl!();
835
836pub type Result<T> = std::result::Result<T, SinkError>;
837
838#[derive(Error, Debug)]
839pub enum SinkError {
840    #[error("Kafka error: {0}")]
841    Kafka(#[from] rdkafka::error::KafkaError),
842    #[error("Kinesis error: {0}")]
843    Kinesis(
844        #[source]
845        #[backtrace]
846        anyhow::Error,
847    ),
848    #[error("Remote sink error: {0}")]
849    Remote(
850        #[source]
851        #[backtrace]
852        anyhow::Error,
853    ),
854    #[error("Encode error: {0}")]
855    Encode(String),
856    #[error("Avro error: {0}")]
857    Avro(#[from] apache_avro::Error),
858    #[error("Iceberg error: {0}")]
859    Iceberg(
860        #[source]
861        #[backtrace]
862        anyhow::Error,
863    ),
864    #[error("config error: {0}")]
865    Config(
866        #[source]
867        #[backtrace]
868        anyhow::Error,
869    ),
870    #[error("coordinator error: {0}")]
871    Coordinator(
872        #[source]
873        #[backtrace]
874        anyhow::Error,
875    ),
876    #[error("ClickHouse error: {0}")]
877    ClickHouse(String),
878    #[error("Redis error: {0}")]
879    Redis(String),
880    #[error("Mqtt error: {0}")]
881    Mqtt(
882        #[source]
883        #[backtrace]
884        anyhow::Error,
885    ),
886    #[error("Nats error: {0}")]
887    Nats(
888        #[source]
889        #[backtrace]
890        anyhow::Error,
891    ),
892    #[error("Google Pub/Sub error: {0}")]
893    GooglePubSub(
894        #[source]
895        #[backtrace]
896        anyhow::Error,
897    ),
898    #[error("Doris/Starrocks connect error: {0}")]
899    DorisStarrocksConnect(
900        #[source]
901        #[backtrace]
902        anyhow::Error,
903    ),
904    #[error("Doris error: {0}")]
905    Doris(String),
906    #[error("DeltaLake error: {0}")]
907    DeltaLake(
908        #[source]
909        #[backtrace]
910        anyhow::Error,
911    ),
912    #[error("ElasticSearch/OpenSearch error: {0}")]
913    ElasticSearchOpenSearch(
914        #[source]
915        #[backtrace]
916        anyhow::Error,
917    ),
918    #[error("Starrocks error: {0}")]
919    Starrocks(String),
920    #[error("File error: {0}")]
921    File(String),
922    #[error("Pulsar error: {0}")]
923    Pulsar(
924        #[source]
925        #[backtrace]
926        anyhow::Error,
927    ),
928    #[error(transparent)]
929    Internal(
930        #[from]
931        #[backtrace]
932        anyhow::Error,
933    ),
934    #[error("BigQuery error: {0}")]
935    BigQuery(
936        #[source]
937        #[backtrace]
938        anyhow::Error,
939    ),
940    #[error("DynamoDB error: {0}")]
941    DynamoDb(
942        #[source]
943        #[backtrace]
944        anyhow::Error,
945    ),
946    #[error("SQL Server error: {0}")]
947    SqlServer(
948        #[source]
949        #[backtrace]
950        anyhow::Error,
951    ),
952    #[error("Postgres error: {0}")]
953    Postgres(
954        #[source]
955        #[backtrace]
956        anyhow::Error,
957    ),
958    #[error(transparent)]
959    Connector(
960        #[from]
961        #[backtrace]
962        ConnectorError,
963    ),
964    #[error("Secret error: {0}")]
965    Secret(
966        #[from]
967        #[backtrace]
968        SecretError,
969    ),
970    #[error("Mongodb error: {0}")]
971    Mongodb(
972        #[source]
973        #[backtrace]
974        anyhow::Error,
975    ),
976}
977
978impl From<sea_orm::DbErr> for SinkError {
979    fn from(err: sea_orm::DbErr) -> Self {
980        SinkError::Iceberg(anyhow!(err))
981    }
982}
983
984impl From<OpendalError> for SinkError {
985    fn from(error: OpendalError) -> Self {
986        SinkError::File(error.to_report_string())
987    }
988}
989
990impl From<parquet::errors::ParquetError> for SinkError {
991    fn from(error: parquet::errors::ParquetError) -> Self {
992        SinkError::File(error.to_report_string())
993    }
994}
995
996impl From<ArrayError> for SinkError {
997    fn from(error: ArrayError) -> Self {
998        SinkError::File(error.to_report_string())
999    }
1000}
1001
1002impl From<RpcError> for SinkError {
1003    fn from(value: RpcError) -> Self {
1004        SinkError::Remote(anyhow!(value))
1005    }
1006}
1007
1008impl From<ClickHouseError> for SinkError {
1009    fn from(value: ClickHouseError) -> Self {
1010        SinkError::ClickHouse(value.to_report_string())
1011    }
1012}
1013
1014#[cfg(feature = "sink-deltalake")]
1015impl From<::deltalake::DeltaTableError> for SinkError {
1016    fn from(value: ::deltalake::DeltaTableError) -> Self {
1017        SinkError::DeltaLake(anyhow!(value))
1018    }
1019}
1020
1021impl From<RedisError> for SinkError {
1022    fn from(value: RedisError) -> Self {
1023        SinkError::Redis(value.to_report_string())
1024    }
1025}
1026
1027impl From<tiberius::error::Error> for SinkError {
1028    fn from(err: tiberius::error::Error) -> Self {
1029        SinkError::SqlServer(anyhow!(err))
1030    }
1031}
1032
1033impl From<::elasticsearch::Error> for SinkError {
1034    fn from(err: ::elasticsearch::Error) -> Self {
1035        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1036    }
1037}
1038
1039impl From<::opensearch::Error> for SinkError {
1040    fn from(err: ::opensearch::Error) -> Self {
1041        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1042    }
1043}
1044
1045impl From<tokio_postgres::Error> for SinkError {
1046    fn from(err: tokio_postgres::Error) -> Self {
1047        SinkError::Postgres(anyhow!(err))
1048    }
1049}