risingwave_connector/sink/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32use risingwave_common::bail;
33pub mod jdbc_jni_client;
34pub mod log_store;
35pub mod mock_coordination_client;
36pub mod mongodb;
37pub mod mqtt;
38pub mod nats;
39pub mod postgres;
40pub mod pulsar;
41pub mod redis;
42pub mod remote;
43pub mod snowflake_redshift;
44pub mod sqlserver;
45pub mod starrocks;
46pub mod test_sink;
47pub mod trivial;
48pub mod utils;
49pub mod writer;
50pub mod prelude {
51    pub use crate::sink::{
52        Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkError,
53        SinkParam, SinkWriterParam,
54    };
55}
56
57use std::collections::BTreeMap;
58use std::future::Future;
59use std::sync::{Arc, LazyLock};
60
61use ::clickhouse::error::Error as ClickHouseError;
62use ::redis::RedisError;
63use anyhow::anyhow;
64use async_trait::async_trait;
65use clickhouse::CLICKHOUSE_SINK;
66use decouple_checkpoint_log_sink::{
67    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
68    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
69};
70use deltalake::DELTALAKE_SINK;
71use futures::future::BoxFuture;
72use iceberg::ICEBERG_SINK;
73use opendal::Error as OpendalError;
74use prometheus::Registry;
75use risingwave_common::array::ArrayError;
76use risingwave_common::bitmap::Bitmap;
77use risingwave_common::catalog::{ColumnDesc, Field, Schema};
78use risingwave_common::config::StreamingConfig;
79use risingwave_common::hash::ActorId;
80use risingwave_common::metrics::{
81    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
82    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
83};
84use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
85use risingwave_common::secret::{LocalSecretManager, SecretError};
86use risingwave_common::session_config::sink_decouple::SinkDecouple;
87use risingwave_common::{
88    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
89    register_guarded_int_gauge_vec_with_registry,
90};
91use risingwave_pb::catalog::PbSinkType;
92use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
93use risingwave_rpc_client::MetaClient;
94use risingwave_rpc_client::error::RpcError;
95use sea_orm::DatabaseConnection;
96use starrocks::STARROCKS_SINK;
97use thiserror::Error;
98use thiserror_ext::AsReport;
99use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
100pub use tracing;
101
102use self::catalog::{SinkFormatDesc, SinkType};
103use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
104use crate::WithPropertiesExt;
105use crate::connector_common::IcebergSinkCompactionUpdate;
106use crate::error::{ConnectorError, ConnectorResult};
107use crate::sink::catalog::desc::SinkDesc;
108use crate::sink::catalog::{SinkCatalog, SinkId};
109use crate::sink::file_sink::fs::FsSink;
110use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
111use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
112use crate::sink::writer::SinkWriter;
113
114const BOUNDED_CHANNEL_SIZE: usize = 16;
115#[macro_export]
116macro_rules! for_all_sinks {
117    ($macro:path $(, $arg:tt)*) => {
118        $macro! {
119            {
120                { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
121                { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
122                { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
123                { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
124                { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
125                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
126                { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
127                { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
128                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
129                { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
130                { Jdbc, $crate::sink::remote::JdbcSink, () },
131                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
132                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
133                { Cassandra, $crate::sink::remote::CassandraSink, () },
134                { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
135                { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
136                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
137
138                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
139                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
140                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
141
142                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
143                { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
144                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
145                { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
146                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
147                { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
148                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
149                { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
150                { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
151                { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
152
153                { Test, $crate::sink::test_sink::TestSink, () },
154                { Table, $crate::sink::trivial::TableSink, () }
155            }
156            $(,$arg)*
157        }
158    };
159}
160
161#[macro_export]
162macro_rules! generate_config_use_clauses {
163    ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
164        $(
165            $crate::generate_config_use_single! { $($config_type)+ }
166        )*
167    };
168}
169
170#[macro_export]
171macro_rules! generate_config_use_single {
172    // Skip () config types
173    (()) => {};
174
175    // Generate use clause for actual config types
176    ($config_type:path) => {
177        #[allow(unused_imports)]
178        pub(super) use $config_type;
179    };
180}
181
182// Convenience macro that uses for_all_sinks
183#[macro_export]
184macro_rules! use_all_sink_configs {
185    () => {
186        $crate::for_all_sinks! { $crate::generate_config_use_clauses }
187    };
188}
189
190#[macro_export]
191macro_rules! dispatch_sink {
192    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
193        use $crate::sink::SinkImpl;
194
195        match $impl {
196            $(
197                SinkImpl::$variant_name($sink) => $body,
198            )*
199        }
200    }};
201    ($impl:expr, $sink:ident, $body:expr) => {{
202        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
203    }};
204}
205
206#[macro_export]
207macro_rules! match_sink_name_str {
208    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
209        use $crate::sink::Sink;
210        match $name_str {
211            $(
212                <$sink_type>::SINK_NAME => {
213                    type $type_name = $sink_type;
214                    {
215                        $body
216                    }
217                },
218            )*
219            other => ($on_other_closure)(other),
220        }
221    }};
222    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
223        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
224    }};
225}
226
227pub const CONNECTOR_TYPE_KEY: &str = "connector";
228pub const SINK_TYPE_OPTION: &str = "type";
229/// `snapshot = false` corresponds to [`risingwave_pb::stream_plan::StreamScanType::UpstreamOnly`]
230pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
231pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
232pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
233pub const SINK_TYPE_UPSERT: &str = "upsert";
234pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
235
236#[derive(Debug, Clone, PartialEq, Eq)]
237pub struct SinkParam {
238    pub sink_id: SinkId,
239    pub sink_name: String,
240    pub properties: BTreeMap<String, String>,
241    pub columns: Vec<ColumnDesc>,
242    pub downstream_pk: Vec<usize>,
243    pub sink_type: SinkType,
244    pub format_desc: Option<SinkFormatDesc>,
245    pub db_name: String,
246
247    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
248    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
249    ///
250    /// See also `gen_sink_plan`.
251    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
252    pub sink_from_name: String,
253}
254
255impl SinkParam {
256    pub fn from_proto(pb_param: PbSinkParam) -> Self {
257        let table_schema = pb_param.table_schema.expect("should contain table schema");
258        let format_desc = match pb_param.format_desc {
259            Some(f) => f.try_into().ok(),
260            None => {
261                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
262                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
263                match (connector, r#type) {
264                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
265                    _ => None,
266                }
267            }
268        };
269        Self {
270            sink_id: SinkId::from(pb_param.sink_id),
271            sink_name: pb_param.sink_name,
272            properties: pb_param.properties,
273            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
274            downstream_pk: table_schema
275                .pk_indices
276                .iter()
277                .map(|i| *i as usize)
278                .collect(),
279            sink_type: SinkType::from_proto(
280                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
281            ),
282            format_desc,
283            db_name: pb_param.db_name,
284            sink_from_name: pb_param.sink_from_name,
285        }
286    }
287
288    pub fn to_proto(&self) -> PbSinkParam {
289        PbSinkParam {
290            sink_id: self.sink_id.sink_id,
291            sink_name: self.sink_name.clone(),
292            properties: self.properties.clone(),
293            table_schema: Some(TableSchema {
294                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
295                pk_indices: self.downstream_pk.iter().map(|i| *i as u32).collect(),
296            }),
297            sink_type: self.sink_type.to_proto().into(),
298            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
299            db_name: self.db_name.clone(),
300            sink_from_name: self.sink_from_name.clone(),
301        }
302    }
303
304    pub fn schema(&self) -> Schema {
305        Schema {
306            fields: self.columns.iter().map(Field::from).collect(),
307        }
308    }
309
310    // `SinkParams` should only be used when there is a secret context.
311    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
312    pub fn fill_secret_for_format_desc(
313        format_desc: Option<SinkFormatDesc>,
314    ) -> Result<Option<SinkFormatDesc>> {
315        match format_desc {
316            Some(mut format_desc) => {
317                format_desc.options = LocalSecretManager::global()
318                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
319                Ok(Some(format_desc))
320            }
321            None => Ok(None),
322        }
323    }
324
325    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
326    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
327        let columns = sink_catalog
328            .visible_columns()
329            .map(|col| col.column_desc.clone())
330            .collect();
331        let properties_with_secret = LocalSecretManager::global()
332            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
333        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
334        Ok(Self {
335            sink_id: sink_catalog.id,
336            sink_name: sink_catalog.name,
337            properties: properties_with_secret,
338            columns,
339            downstream_pk: sink_catalog.downstream_pk,
340            sink_type: sink_catalog.sink_type,
341            format_desc: format_desc_with_secret,
342            db_name: sink_catalog.db_name,
343            sink_from_name: sink_catalog.sink_from_name,
344        })
345    }
346}
347
348pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
349    use crate::enforce_secret::EnforceSecret;
350
351    let connector = props
352        .get_connector()
353        .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
354    let key_iter = props.key_iter();
355    match_sink_name_str!(
356        connector.as_str(),
357        PropType,
358        PropType::enforce_secret(key_iter),
359        |other| bail!("connector '{}' is not supported", other)
360    )
361}
362
363pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
364    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
365
366#[derive(Clone)]
367pub struct SinkMetrics {
368    pub sink_commit_duration: LabelGuardedHistogramVec,
369    pub connector_sink_rows_received: LabelGuardedIntCounterVec,
370
371    // Log store writer metrics
372    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
373    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
374    pub log_store_write_rows: LabelGuardedIntCounterVec,
375
376    // Log store reader metrics
377    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
378    pub log_store_read_rows: LabelGuardedIntCounterVec,
379    pub log_store_read_bytes: LabelGuardedIntCounterVec,
380    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
381
382    // Iceberg metrics
383    pub iceberg_write_qps: LabelGuardedIntCounterVec,
384    pub iceberg_write_latency: LabelGuardedHistogramVec,
385    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
386    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
387    pub iceberg_partition_num: LabelGuardedIntGaugeVec,
388    pub iceberg_write_bytes: LabelGuardedIntCounterVec,
389    pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
390}
391
392impl SinkMetrics {
393    pub fn new(registry: &Registry) -> Self {
394        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
395            "sink_commit_duration",
396            "Duration of commit op in sink",
397            &["actor_id", "connector", "sink_id", "sink_name"],
398            registry
399        )
400        .unwrap();
401
402        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
403            "connector_sink_rows_received",
404            "Number of rows received by sink",
405            &["actor_id", "connector_type", "sink_id", "sink_name"],
406            registry
407        )
408        .unwrap();
409
410        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
411            "log_store_first_write_epoch",
412            "The first write epoch of log store",
413            &["actor_id", "sink_id", "sink_name"],
414            registry
415        )
416        .unwrap();
417
418        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
419            "log_store_latest_write_epoch",
420            "The latest write epoch of log store",
421            &["actor_id", "sink_id", "sink_name"],
422            registry
423        )
424        .unwrap();
425
426        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
427            "log_store_write_rows",
428            "The write rate of rows",
429            &["actor_id", "sink_id", "sink_name"],
430            registry
431        )
432        .unwrap();
433
434        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
435            "log_store_latest_read_epoch",
436            "The latest read epoch of log store",
437            &["actor_id", "connector", "sink_id", "sink_name"],
438            registry
439        )
440        .unwrap();
441
442        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
443            "log_store_read_rows",
444            "The read rate of rows",
445            &["actor_id", "connector", "sink_id", "sink_name"],
446            registry
447        )
448        .unwrap();
449
450        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
451            "log_store_read_bytes",
452            "Total size of chunks read by log reader",
453            &["actor_id", "connector", "sink_id", "sink_name"],
454            registry
455        )
456        .unwrap();
457
458        let log_store_reader_wait_new_future_duration_ns =
459            register_guarded_int_counter_vec_with_registry!(
460                "log_store_reader_wait_new_future_duration_ns",
461                "Accumulated duration of LogReader to wait for next call to create future",
462                &["actor_id", "connector", "sink_id", "sink_name"],
463                registry
464            )
465            .unwrap();
466
467        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
468            "iceberg_write_qps",
469            "The qps of iceberg writer",
470            &["actor_id", "sink_id", "sink_name"],
471            registry
472        )
473        .unwrap();
474
475        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
476            "iceberg_write_latency",
477            "The latency of iceberg writer",
478            &["actor_id", "sink_id", "sink_name"],
479            registry
480        )
481        .unwrap();
482
483        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
484            "iceberg_rolling_unflushed_data_file",
485            "The unflushed data file count of iceberg rolling writer",
486            &["actor_id", "sink_id", "sink_name"],
487            registry
488        )
489        .unwrap();
490
491        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
492            "iceberg_position_delete_cache_num",
493            "The delete cache num of iceberg position delete writer",
494            &["actor_id", "sink_id", "sink_name"],
495            registry
496        )
497        .unwrap();
498
499        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
500            "iceberg_partition_num",
501            "The partition num of iceberg partition writer",
502            &["actor_id", "sink_id", "sink_name"],
503            registry
504        )
505        .unwrap();
506
507        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
508            "iceberg_write_bytes",
509            "The write bytes of iceberg writer",
510            &["actor_id", "sink_id", "sink_name"],
511            registry
512        )
513        .unwrap();
514
515        let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
516            "iceberg_snapshot_num",
517            "The snapshot number of iceberg table",
518            &["sink_name", "catalog_name", "table_name"],
519            registry
520        )
521        .unwrap();
522
523        Self {
524            sink_commit_duration,
525            connector_sink_rows_received,
526            log_store_first_write_epoch,
527            log_store_latest_write_epoch,
528            log_store_write_rows,
529            log_store_latest_read_epoch,
530            log_store_read_rows,
531            log_store_read_bytes,
532            log_store_reader_wait_new_future_duration_ns,
533            iceberg_write_qps,
534            iceberg_write_latency,
535            iceberg_rolling_unflushed_data_file,
536            iceberg_position_delete_cache_num,
537            iceberg_partition_num,
538            iceberg_write_bytes,
539            iceberg_snapshot_num,
540        }
541    }
542}
543
544#[derive(Clone)]
545pub struct SinkWriterParam {
546    // TODO(eric): deprecate executor_id
547    pub executor_id: u64,
548    pub vnode_bitmap: Option<Bitmap>,
549    pub meta_client: Option<SinkMetaClient>,
550    // The val has two effect:
551    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
552    // 2. The index of the extra partition value column.
553    // More detail of partition value column, see `PartitionComputeInfo`
554    pub extra_partition_col_idx: Option<usize>,
555
556    pub actor_id: ActorId,
557    pub sink_id: SinkId,
558    pub sink_name: String,
559    pub connector: String,
560    pub streaming_config: StreamingConfig,
561}
562
563#[derive(Clone)]
564pub struct SinkWriterMetrics {
565    pub sink_commit_duration: LabelGuardedHistogram,
566    pub connector_sink_rows_received: LabelGuardedIntCounter,
567}
568
569impl SinkWriterMetrics {
570    pub fn new(writer_param: &SinkWriterParam) -> Self {
571        let labels = [
572            &writer_param.actor_id.to_string(),
573            writer_param.connector.as_str(),
574            &writer_param.sink_id.to_string(),
575            writer_param.sink_name.as_str(),
576        ];
577        let sink_commit_duration = GLOBAL_SINK_METRICS
578            .sink_commit_duration
579            .with_guarded_label_values(&labels);
580        let connector_sink_rows_received = GLOBAL_SINK_METRICS
581            .connector_sink_rows_received
582            .with_guarded_label_values(&labels);
583        Self {
584            sink_commit_duration,
585            connector_sink_rows_received,
586        }
587    }
588
589    #[cfg(test)]
590    pub fn for_test() -> Self {
591        Self {
592            sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
593            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
594        }
595    }
596}
597
598#[derive(Clone)]
599pub enum SinkMetaClient {
600    MetaClient(MetaClient),
601    MockMetaClient(MockMetaClient),
602}
603
604impl SinkMetaClient {
605    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
606        match self {
607            SinkMetaClient::MetaClient(meta_client) => {
608                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
609                    meta_client.sink_coordinate_client().await,
610                )
611            }
612            SinkMetaClient::MockMetaClient(mock_meta_client) => {
613                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
614                    mock_meta_client.sink_coordinate_client(),
615                )
616            }
617        }
618    }
619
620    pub async fn add_sink_fail_evet_log(
621        &self,
622        sink_id: u32,
623        sink_name: String,
624        connector: String,
625        error: String,
626    ) {
627        match self {
628            SinkMetaClient::MetaClient(meta_client) => {
629                match meta_client
630                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
631                    .await
632                {
633                    Ok(_) => {}
634                    Err(e) => {
635                        tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Fialed to add sink fail event to event log.");
636                    }
637                }
638            }
639            SinkMetaClient::MockMetaClient(_) => {}
640        }
641    }
642}
643
644impl SinkWriterParam {
645    pub fn for_test() -> Self {
646        SinkWriterParam {
647            executor_id: Default::default(),
648            vnode_bitmap: Default::default(),
649            meta_client: Default::default(),
650            extra_partition_col_idx: Default::default(),
651
652            actor_id: 1,
653            sink_id: SinkId::new(1),
654            sink_name: "test_sink".to_owned(),
655            connector: "test_connector".to_owned(),
656            streaming_config: StreamingConfig::default(),
657        }
658    }
659}
660
661fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
662    matches!(
663        sink_name,
664        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
665    )
666}
667pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
668    const SINK_NAME: &'static str;
669
670    type LogSinker: LogSinker;
671    #[expect(deprecated)]
672    type Coordinator: SinkCommitCoordinator = NoSinkCommitCoordinator;
673
674    fn set_default_commit_checkpoint_interval(
675        desc: &mut SinkDesc,
676        user_specified: &SinkDecouple,
677    ) -> Result<()> {
678        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
679            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
680                Some(commit_checkpoint_interval) => {
681                    let commit_checkpoint_interval = commit_checkpoint_interval
682                        .parse::<u64>()
683                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
684                    if matches!(user_specified, SinkDecouple::Disable)
685                        && commit_checkpoint_interval > 1
686                    {
687                        return Err(SinkError::Config(anyhow!(
688                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
689                        )));
690                    }
691                }
692                None => match user_specified {
693                    SinkDecouple::Default | SinkDecouple::Enable => {
694                        desc.properties.insert(
695                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
696                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
697                        );
698                    }
699                    SinkDecouple::Disable => {
700                        desc.properties.insert(
701                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
702                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
703                        );
704                    }
705                },
706            }
707        }
708        Ok(())
709    }
710
711    /// `user_specified` is the value of `sink_decouple` config.
712    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
713        match user_specified {
714            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
715            SinkDecouple::Disable => Ok(false),
716        }
717    }
718
719    fn support_schema_change() -> bool {
720        false
721    }
722
723    fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
724        Ok(())
725    }
726
727    async fn validate(&self) -> Result<()>;
728    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
729
730    fn is_coordinated_sink(&self) -> bool {
731        false
732    }
733
734    async fn new_coordinator(
735        &self,
736        _db: DatabaseConnection,
737        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
738    ) -> Result<Self::Coordinator> {
739        Err(SinkError::Coordinator(anyhow!("no coordinator")))
740    }
741}
742
743pub trait SinkLogReader: Send {
744    fn start_from(
745        &mut self,
746        start_offset: Option<u64>,
747    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
748    /// Emit the next item.
749    ///
750    /// The implementation should ensure that the future is cancellation safe.
751    fn next_item(
752        &mut self,
753    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
754
755    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
756    /// from the current offset.
757    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
758}
759
760impl<R: LogReader> SinkLogReader for &mut R {
761    fn next_item(
762        &mut self,
763    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
764        <R as LogReader>::next_item(*self)
765    }
766
767    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
768        <R as LogReader>::truncate(*self, offset)
769    }
770
771    fn start_from(
772        &mut self,
773        start_offset: Option<u64>,
774    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
775        <R as LogReader>::start_from(*self, start_offset)
776    }
777}
778
779#[async_trait]
780pub trait LogSinker: 'static + Send {
781    // Note: Please rebuild the log reader's read stream before consuming the log store,
782    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
783}
784pub type SinkCommittedEpochSubscriber = Arc<
785    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
786        + Send
787        + Sync
788        + 'static,
789>;
790
791#[async_trait]
792pub trait SinkCommitCoordinator {
793    /// Initialize the sink committer coordinator, return the log store rewind start offset.
794    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
795    /// After collecting the metadata from each sink writer, a coordinator will call `commit` with
796    /// the set of metadata. The metadata is serialized into bytes, because the metadata is expected
797    /// to be passed between different gRPC node, so in this general trait, the metadata is
798    /// serialized bytes.
799    async fn commit(
800        &mut self,
801        epoch: u64,
802        metadata: Vec<SinkMetadata>,
803        add_columns: Option<Vec<Field>>,
804    ) -> Result<()>;
805}
806
807#[deprecated]
808/// A place holder struct of `SinkCommitCoordinator` for sink without coordinator.
809///
810/// It can never be constructed because it holds a never type, and therefore it's safe to
811/// mark all its methods as unreachable.
812///
813/// Explicitly mark this struct as `deprecated` so that when developers accidentally declare it explicitly as
814/// the associated type `Coordinator` when implementing `Sink` trait, they can be warned, and remove the explicit
815/// declaration.
816///
817/// Note:
818///     When we implement a sink without coordinator, don't explicitly write `type Coordinator = NoSinkCommitCoordinator`.
819///     Just remove the explicit declaration and use the default associated type, and besides, don't explicitly implement
820///     `fn new_coordinator(...)` and use the default implementation.
821pub struct NoSinkCommitCoordinator(!);
822
823#[expect(deprecated)]
824#[async_trait]
825impl SinkCommitCoordinator for NoSinkCommitCoordinator {
826    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
827        unreachable!()
828    }
829
830    async fn commit(
831        &mut self,
832        _epoch: u64,
833        _metadata: Vec<SinkMetadata>,
834        _add_columns: Option<Vec<Field>>,
835    ) -> Result<()> {
836        unreachable!()
837    }
838}
839
840impl SinkImpl {
841    pub fn new(mut param: SinkParam) -> Result<Self> {
842        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
843
844        // remove privatelink related properties if any
845        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
846
847        let sink_type = param
848            .properties
849            .get(CONNECTOR_TYPE_KEY)
850            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
851
852        let sink_type = sink_type.to_lowercase();
853        match_sink_name_str!(
854            sink_type.as_str(),
855            SinkType,
856            Ok(SinkType::try_from(param)?.into()),
857            |other| {
858                Err(SinkError::Config(anyhow!(
859                    "unsupported sink connector {}",
860                    other
861                )))
862            }
863        )
864    }
865
866    pub fn is_sink_into_table(&self) -> bool {
867        matches!(self, SinkImpl::Table(_))
868    }
869
870    pub fn is_blackhole(&self) -> bool {
871        matches!(self, SinkImpl::BlackHole(_))
872    }
873
874    pub fn is_coordinated_sink(&self) -> bool {
875        dispatch_sink!(self, sink, sink.is_coordinated_sink())
876    }
877}
878
879pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
880    SinkImpl::new(param)
881}
882
883macro_rules! def_sink_impl {
884    () => {
885        $crate::for_all_sinks! { def_sink_impl }
886    };
887    ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
888        #[derive(Debug)]
889        pub enum SinkImpl {
890            $(
891                $variant_name(Box<$sink_type>),
892            )*
893        }
894
895        $(
896            impl From<$sink_type> for SinkImpl {
897                fn from(sink: $sink_type) -> SinkImpl {
898                    SinkImpl::$variant_name(Box::new(sink))
899                }
900            }
901        )*
902    };
903}
904
905def_sink_impl!();
906
907pub type Result<T> = std::result::Result<T, SinkError>;
908
909#[derive(Error, Debug)]
910pub enum SinkError {
911    #[error("Kafka error: {0}")]
912    Kafka(#[from] rdkafka::error::KafkaError),
913    #[error("Kinesis error: {0}")]
914    Kinesis(
915        #[source]
916        #[backtrace]
917        anyhow::Error,
918    ),
919    #[error("Remote sink error: {0}")]
920    Remote(
921        #[source]
922        #[backtrace]
923        anyhow::Error,
924    ),
925    #[error("Encode error: {0}")]
926    Encode(String),
927    #[error("Avro error: {0}")]
928    Avro(#[from] apache_avro::Error),
929    #[error("Iceberg error: {0}")]
930    Iceberg(
931        #[source]
932        #[backtrace]
933        anyhow::Error,
934    ),
935    #[error("config error: {0}")]
936    Config(
937        #[source]
938        #[backtrace]
939        anyhow::Error,
940    ),
941    #[error("coordinator error: {0}")]
942    Coordinator(
943        #[source]
944        #[backtrace]
945        anyhow::Error,
946    ),
947    #[error("ClickHouse error: {0}")]
948    ClickHouse(String),
949    #[error("Redis error: {0}")]
950    Redis(String),
951    #[error("Mqtt error: {0}")]
952    Mqtt(
953        #[source]
954        #[backtrace]
955        anyhow::Error,
956    ),
957    #[error("Nats error: {0}")]
958    Nats(
959        #[source]
960        #[backtrace]
961        anyhow::Error,
962    ),
963    #[error("Google Pub/Sub error: {0}")]
964    GooglePubSub(
965        #[source]
966        #[backtrace]
967        anyhow::Error,
968    ),
969    #[error("Doris/Starrocks connect error: {0}")]
970    DorisStarrocksConnect(
971        #[source]
972        #[backtrace]
973        anyhow::Error,
974    ),
975    #[error("Doris error: {0}")]
976    Doris(String),
977    #[error("DeltaLake error: {0}")]
978    DeltaLake(
979        #[source]
980        #[backtrace]
981        anyhow::Error,
982    ),
983    #[error("ElasticSearch/OpenSearch error: {0}")]
984    ElasticSearchOpenSearch(
985        #[source]
986        #[backtrace]
987        anyhow::Error,
988    ),
989    #[error("Starrocks error: {0}")]
990    Starrocks(String),
991    #[error("File error: {0}")]
992    File(String),
993    #[error("Pulsar error: {0}")]
994    Pulsar(
995        #[source]
996        #[backtrace]
997        anyhow::Error,
998    ),
999    #[error(transparent)]
1000    Internal(
1001        #[from]
1002        #[backtrace]
1003        anyhow::Error,
1004    ),
1005    #[error("BigQuery error: {0}")]
1006    BigQuery(
1007        #[source]
1008        #[backtrace]
1009        anyhow::Error,
1010    ),
1011    #[error("DynamoDB error: {0}")]
1012    DynamoDb(
1013        #[source]
1014        #[backtrace]
1015        anyhow::Error,
1016    ),
1017    #[error("SQL Server error: {0}")]
1018    SqlServer(
1019        #[source]
1020        #[backtrace]
1021        anyhow::Error,
1022    ),
1023    #[error("Postgres error: {0}")]
1024    Postgres(
1025        #[source]
1026        #[backtrace]
1027        anyhow::Error,
1028    ),
1029    #[error(transparent)]
1030    Connector(
1031        #[from]
1032        #[backtrace]
1033        ConnectorError,
1034    ),
1035    #[error("Secret error: {0}")]
1036    Secret(
1037        #[from]
1038        #[backtrace]
1039        SecretError,
1040    ),
1041    #[error("Mongodb error: {0}")]
1042    Mongodb(
1043        #[source]
1044        #[backtrace]
1045        anyhow::Error,
1046    ),
1047}
1048
1049impl From<sea_orm::DbErr> for SinkError {
1050    fn from(err: sea_orm::DbErr) -> Self {
1051        SinkError::Iceberg(anyhow!(err))
1052    }
1053}
1054
1055impl From<OpendalError> for SinkError {
1056    fn from(error: OpendalError) -> Self {
1057        SinkError::File(error.to_report_string())
1058    }
1059}
1060
1061impl From<parquet::errors::ParquetError> for SinkError {
1062    fn from(error: parquet::errors::ParquetError) -> Self {
1063        SinkError::File(error.to_report_string())
1064    }
1065}
1066
1067impl From<ArrayError> for SinkError {
1068    fn from(error: ArrayError) -> Self {
1069        SinkError::File(error.to_report_string())
1070    }
1071}
1072
1073impl From<RpcError> for SinkError {
1074    fn from(value: RpcError) -> Self {
1075        SinkError::Remote(anyhow!(value))
1076    }
1077}
1078
1079impl From<ClickHouseError> for SinkError {
1080    fn from(value: ClickHouseError) -> Self {
1081        SinkError::ClickHouse(value.to_report_string())
1082    }
1083}
1084
1085#[cfg(feature = "sink-deltalake")]
1086impl From<::deltalake::DeltaTableError> for SinkError {
1087    fn from(value: ::deltalake::DeltaTableError) -> Self {
1088        SinkError::DeltaLake(anyhow!(value))
1089    }
1090}
1091
1092impl From<RedisError> for SinkError {
1093    fn from(value: RedisError) -> Self {
1094        SinkError::Redis(value.to_report_string())
1095    }
1096}
1097
1098impl From<tiberius::error::Error> for SinkError {
1099    fn from(err: tiberius::error::Error) -> Self {
1100        SinkError::SqlServer(anyhow!(err))
1101    }
1102}
1103
1104impl From<::elasticsearch::Error> for SinkError {
1105    fn from(err: ::elasticsearch::Error) -> Self {
1106        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1107    }
1108}
1109
1110impl From<::opensearch::Error> for SinkError {
1111    fn from(err: ::opensearch::Error) -> Self {
1112        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1113    }
1114}
1115
1116impl From<tokio_postgres::Error> for SinkError {
1117    fn from(err: tokio_postgres::Error) -> Self {
1118        SinkError::Postgres(anyhow!(err))
1119    }
1120}