risingwave_connector/sink/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14pub mod big_query;
15pub mod boxed;
16pub mod catalog;
17pub mod clickhouse;
18pub mod coordinate;
19pub mod decouple_checkpoint_log_sink;
20pub mod deltalake;
21pub mod doris;
22pub mod doris_starrocks_connector;
23pub mod dynamodb;
24pub mod elasticsearch_opensearch;
25pub mod encoder;
26pub mod file_sink;
27pub mod formatter;
28pub mod google_pubsub;
29pub mod iceberg;
30pub mod kafka;
31pub mod kinesis;
32use risingwave_common::bail;
33pub mod jdbc_jni_client;
34pub mod log_store;
35pub mod mock_coordination_client;
36pub mod mongodb;
37pub mod mqtt;
38pub mod nats;
39pub mod postgres;
40pub mod pulsar;
41pub mod redis;
42pub mod remote;
43pub mod snowflake_redshift;
44pub mod sqlserver;
45pub mod starrocks;
46pub mod test_sink;
47pub mod trivial;
48pub mod utils;
49pub mod writer;
50pub mod prelude {
51    pub use crate::sink::{
52        Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION, Sink, SinkError,
53        SinkParam, SinkWriterParam,
54    };
55}
56
57use std::collections::BTreeMap;
58use std::future::Future;
59use std::sync::{Arc, LazyLock};
60
61use ::clickhouse::error::Error as ClickHouseError;
62use ::redis::RedisError;
63use anyhow::anyhow;
64use async_trait::async_trait;
65use clickhouse::CLICKHOUSE_SINK;
66use decouple_checkpoint_log_sink::{
67    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
68    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
69};
70use deltalake::DELTALAKE_SINK;
71use futures::future::BoxFuture;
72use iceberg::ICEBERG_SINK;
73use opendal::Error as OpendalError;
74use prometheus::Registry;
75use risingwave_common::array::ArrayError;
76use risingwave_common::bitmap::Bitmap;
77use risingwave_common::catalog::{ColumnDesc, Field, Schema};
78use risingwave_common::config::StreamingConfig;
79use risingwave_common::hash::ActorId;
80use risingwave_common::metrics::{
81    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
82    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
83};
84use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
85use risingwave_common::secret::{LocalSecretManager, SecretError};
86use risingwave_common::session_config::sink_decouple::SinkDecouple;
87use risingwave_common::{
88    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
89    register_guarded_int_gauge_vec_with_registry,
90};
91use risingwave_pb::catalog::PbSinkType;
92use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
93use risingwave_rpc_client::MetaClient;
94use risingwave_rpc_client::error::RpcError;
95use sea_orm::DatabaseConnection;
96use starrocks::STARROCKS_SINK;
97use thiserror::Error;
98use thiserror_ext::AsReport;
99use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
100pub use tracing;
101
102use self::catalog::{SinkFormatDesc, SinkType};
103use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
104use crate::WithPropertiesExt;
105use crate::connector_common::IcebergSinkCompactionUpdate;
106use crate::error::{ConnectorError, ConnectorResult};
107use crate::sink::catalog::desc::SinkDesc;
108use crate::sink::catalog::{SinkCatalog, SinkId};
109use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
110use crate::sink::file_sink::fs::FsSink;
111use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
112use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
113use crate::sink::writer::SinkWriter;
114
115const BOUNDED_CHANNEL_SIZE: usize = 16;
116#[macro_export]
117macro_rules! for_all_sinks {
118    ($macro:path $(, $arg:tt)*) => {
119        $macro! {
120            {
121                { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
122                { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
123                { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
124                { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
125                { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
126                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
127                { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
128                { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
129                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
130                { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
131                { Jdbc, $crate::sink::remote::JdbcSink, () },
132                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
133                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
134                { Cassandra, $crate::sink::remote::CassandraSink, () },
135                { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
136                { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
137                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
138
139                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
140                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
141                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
142
143                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
144                { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
145                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
146                { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
147                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
148                { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
149                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
150                { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
151                { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
152                { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
153
154                { Test, $crate::sink::test_sink::TestSink, () },
155                { Table, $crate::sink::trivial::TableSink, () }
156            }
157            $(,$arg)*
158        }
159    };
160}
161
162#[macro_export]
163macro_rules! generate_config_use_clauses {
164    ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
165        $(
166            $crate::generate_config_use_single! { $($config_type)+ }
167        )*
168    };
169}
170
171#[macro_export]
172macro_rules! generate_config_use_single {
173    // Skip () config types
174    (()) => {};
175
176    // Generate use clause for actual config types
177    ($config_type:path) => {
178        #[allow(unused_imports)]
179        pub(super) use $config_type;
180    };
181}
182
183// Convenience macro that uses for_all_sinks
184#[macro_export]
185macro_rules! use_all_sink_configs {
186    () => {
187        $crate::for_all_sinks! { $crate::generate_config_use_clauses }
188    };
189}
190
191#[macro_export]
192macro_rules! dispatch_sink {
193    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
194        use $crate::sink::SinkImpl;
195
196        match $impl {
197            $(
198                SinkImpl::$variant_name($sink) => $body,
199            )*
200        }
201    }};
202    ($impl:expr, $sink:ident, $body:expr) => {{
203        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
204    }};
205}
206
207#[macro_export]
208macro_rules! match_sink_name_str {
209    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
210        use $crate::sink::Sink;
211        match $name_str {
212            $(
213                <$sink_type>::SINK_NAME => {
214                    type $type_name = $sink_type;
215                    {
216                        $body
217                    }
218                },
219            )*
220            other => ($on_other_closure)(other),
221        }
222    }};
223    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
224        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
225    }};
226}
227
228pub const CONNECTOR_TYPE_KEY: &str = "connector";
229pub const SINK_TYPE_OPTION: &str = "type";
230/// `snapshot = false` corresponds to [`risingwave_pb::stream_plan::StreamScanType::UpstreamOnly`]
231pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
232pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
233pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
234pub const SINK_TYPE_UPSERT: &str = "upsert";
235pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
236
237#[derive(Debug, Clone, PartialEq, Eq)]
238pub struct SinkParam {
239    pub sink_id: SinkId,
240    pub sink_name: String,
241    pub properties: BTreeMap<String, String>,
242    pub columns: Vec<ColumnDesc>,
243    /// User-defined primary key indices for upsert sink, if any.
244    pub downstream_pk: Option<Vec<usize>>,
245    pub sink_type: SinkType,
246    pub format_desc: Option<SinkFormatDesc>,
247    pub db_name: String,
248
249    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
250    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
251    ///
252    /// See also `gen_sink_plan`.
253    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
254    pub sink_from_name: String,
255}
256
257impl SinkParam {
258    pub fn from_proto(pb_param: PbSinkParam) -> Self {
259        let table_schema = pb_param.table_schema.expect("should contain table schema");
260        let format_desc = match pb_param.format_desc {
261            Some(f) => f.try_into().ok(),
262            None => {
263                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
264                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
265                match (connector, r#type) {
266                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
267                    _ => None,
268                }
269            }
270        };
271        Self {
272            sink_id: SinkId::from(pb_param.sink_id),
273            sink_name: pb_param.sink_name,
274            properties: pb_param.properties,
275            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
276            downstream_pk: if table_schema.pk_indices.is_empty() {
277                None
278            } else {
279                Some(
280                    (table_schema.pk_indices.iter())
281                        .map(|i| *i as usize)
282                        .collect(),
283                )
284            },
285            sink_type: SinkType::from_proto(
286                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
287            ),
288            format_desc,
289            db_name: pb_param.db_name,
290            sink_from_name: pb_param.sink_from_name,
291        }
292    }
293
294    pub fn to_proto(&self) -> PbSinkParam {
295        PbSinkParam {
296            sink_id: self.sink_id.sink_id,
297            sink_name: self.sink_name.clone(),
298            properties: self.properties.clone(),
299            table_schema: Some(TableSchema {
300                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
301                pk_indices: (self.downstream_pk.as_ref())
302                    .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
303            }),
304            sink_type: self.sink_type.to_proto().into(),
305            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
306            db_name: self.db_name.clone(),
307            sink_from_name: self.sink_from_name.clone(),
308        }
309    }
310
311    pub fn schema(&self) -> Schema {
312        Schema {
313            fields: self.columns.iter().map(Field::from).collect(),
314        }
315    }
316
317    /// Get the downstream primary key indices specified by the user. If not specified, return
318    /// an empty vector.
319    ///
320    /// Prefer directly accessing the `downstream_pk` field, as it uses `None` to represent
321    /// unspecified values, making it clearer.
322    pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
323        self.downstream_pk.clone().unwrap_or_default()
324    }
325
326    // `SinkParams` should only be used when there is a secret context.
327    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
328    pub fn fill_secret_for_format_desc(
329        format_desc: Option<SinkFormatDesc>,
330    ) -> Result<Option<SinkFormatDesc>> {
331        match format_desc {
332            Some(mut format_desc) => {
333                format_desc.options = LocalSecretManager::global()
334                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
335                Ok(Some(format_desc))
336            }
337            None => Ok(None),
338        }
339    }
340
341    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
342    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
343        let columns = sink_catalog
344            .visible_columns()
345            .map(|col| col.column_desc.clone())
346            .collect();
347        let properties_with_secret = LocalSecretManager::global()
348            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
349        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
350        Ok(Self {
351            sink_id: sink_catalog.id,
352            sink_name: sink_catalog.name,
353            properties: properties_with_secret,
354            columns,
355            downstream_pk: sink_catalog.downstream_pk,
356            sink_type: sink_catalog.sink_type,
357            format_desc: format_desc_with_secret,
358            db_name: sink_catalog.db_name,
359            sink_from_name: sink_catalog.sink_from_name,
360        })
361    }
362}
363
364pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
365    use crate::enforce_secret::EnforceSecret;
366
367    let connector = props
368        .get_connector()
369        .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
370    let key_iter = props.key_iter();
371    match_sink_name_str!(
372        connector.as_str(),
373        PropType,
374        PropType::enforce_secret(key_iter),
375        |other| bail!("connector '{}' is not supported", other)
376    )
377}
378
379pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
380    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
381
382#[derive(Clone)]
383pub struct SinkMetrics {
384    pub sink_commit_duration: LabelGuardedHistogramVec,
385    pub connector_sink_rows_received: LabelGuardedIntCounterVec,
386
387    // Log store writer metrics
388    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
389    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
390    pub log_store_write_rows: LabelGuardedIntCounterVec,
391
392    // Log store reader metrics
393    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
394    pub log_store_read_rows: LabelGuardedIntCounterVec,
395    pub log_store_read_bytes: LabelGuardedIntCounterVec,
396    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
397
398    // Iceberg metrics
399    pub iceberg_write_qps: LabelGuardedIntCounterVec,
400    pub iceberg_write_latency: LabelGuardedHistogramVec,
401    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
402    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
403    pub iceberg_partition_num: LabelGuardedIntGaugeVec,
404    pub iceberg_write_bytes: LabelGuardedIntCounterVec,
405    pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
406}
407
408impl SinkMetrics {
409    pub fn new(registry: &Registry) -> Self {
410        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
411            "sink_commit_duration",
412            "Duration of commit op in sink",
413            &["actor_id", "connector", "sink_id", "sink_name"],
414            registry
415        )
416        .unwrap();
417
418        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
419            "connector_sink_rows_received",
420            "Number of rows received by sink",
421            &["actor_id", "connector_type", "sink_id", "sink_name"],
422            registry
423        )
424        .unwrap();
425
426        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
427            "log_store_first_write_epoch",
428            "The first write epoch of log store",
429            &["actor_id", "sink_id", "sink_name"],
430            registry
431        )
432        .unwrap();
433
434        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
435            "log_store_latest_write_epoch",
436            "The latest write epoch of log store",
437            &["actor_id", "sink_id", "sink_name"],
438            registry
439        )
440        .unwrap();
441
442        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
443            "log_store_write_rows",
444            "The write rate of rows",
445            &["actor_id", "sink_id", "sink_name"],
446            registry
447        )
448        .unwrap();
449
450        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
451            "log_store_latest_read_epoch",
452            "The latest read epoch of log store",
453            &["actor_id", "connector", "sink_id", "sink_name"],
454            registry
455        )
456        .unwrap();
457
458        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
459            "log_store_read_rows",
460            "The read rate of rows",
461            &["actor_id", "connector", "sink_id", "sink_name"],
462            registry
463        )
464        .unwrap();
465
466        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
467            "log_store_read_bytes",
468            "Total size of chunks read by log reader",
469            &["actor_id", "connector", "sink_id", "sink_name"],
470            registry
471        )
472        .unwrap();
473
474        let log_store_reader_wait_new_future_duration_ns =
475            register_guarded_int_counter_vec_with_registry!(
476                "log_store_reader_wait_new_future_duration_ns",
477                "Accumulated duration of LogReader to wait for next call to create future",
478                &["actor_id", "connector", "sink_id", "sink_name"],
479                registry
480            )
481            .unwrap();
482
483        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
484            "iceberg_write_qps",
485            "The qps of iceberg writer",
486            &["actor_id", "sink_id", "sink_name"],
487            registry
488        )
489        .unwrap();
490
491        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
492            "iceberg_write_latency",
493            "The latency of iceberg writer",
494            &["actor_id", "sink_id", "sink_name"],
495            registry
496        )
497        .unwrap();
498
499        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
500            "iceberg_rolling_unflushed_data_file",
501            "The unflushed data file count of iceberg rolling writer",
502            &["actor_id", "sink_id", "sink_name"],
503            registry
504        )
505        .unwrap();
506
507        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
508            "iceberg_position_delete_cache_num",
509            "The delete cache num of iceberg position delete writer",
510            &["actor_id", "sink_id", "sink_name"],
511            registry
512        )
513        .unwrap();
514
515        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
516            "iceberg_partition_num",
517            "The partition num of iceberg partition writer",
518            &["actor_id", "sink_id", "sink_name"],
519            registry
520        )
521        .unwrap();
522
523        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
524            "iceberg_write_bytes",
525            "The write bytes of iceberg writer",
526            &["actor_id", "sink_id", "sink_name"],
527            registry
528        )
529        .unwrap();
530
531        let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
532            "iceberg_snapshot_num",
533            "The snapshot number of iceberg table",
534            &["sink_name", "catalog_name", "table_name"],
535            registry
536        )
537        .unwrap();
538
539        Self {
540            sink_commit_duration,
541            connector_sink_rows_received,
542            log_store_first_write_epoch,
543            log_store_latest_write_epoch,
544            log_store_write_rows,
545            log_store_latest_read_epoch,
546            log_store_read_rows,
547            log_store_read_bytes,
548            log_store_reader_wait_new_future_duration_ns,
549            iceberg_write_qps,
550            iceberg_write_latency,
551            iceberg_rolling_unflushed_data_file,
552            iceberg_position_delete_cache_num,
553            iceberg_partition_num,
554            iceberg_write_bytes,
555            iceberg_snapshot_num,
556        }
557    }
558}
559
560#[derive(Clone)]
561pub struct SinkWriterParam {
562    // TODO(eric): deprecate executor_id
563    pub executor_id: u64,
564    pub vnode_bitmap: Option<Bitmap>,
565    pub meta_client: Option<SinkMetaClient>,
566    // The val has two effect:
567    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
568    // 2. The index of the extra partition value column.
569    // More detail of partition value column, see `PartitionComputeInfo`
570    pub extra_partition_col_idx: Option<usize>,
571
572    pub actor_id: ActorId,
573    pub sink_id: SinkId,
574    pub sink_name: String,
575    pub connector: String,
576    pub streaming_config: StreamingConfig,
577}
578
579#[derive(Clone)]
580pub struct SinkWriterMetrics {
581    pub sink_commit_duration: LabelGuardedHistogram,
582    pub connector_sink_rows_received: LabelGuardedIntCounter,
583}
584
585impl SinkWriterMetrics {
586    pub fn new(writer_param: &SinkWriterParam) -> Self {
587        let labels = [
588            &writer_param.actor_id.to_string(),
589            writer_param.connector.as_str(),
590            &writer_param.sink_id.to_string(),
591            writer_param.sink_name.as_str(),
592        ];
593        let sink_commit_duration = GLOBAL_SINK_METRICS
594            .sink_commit_duration
595            .with_guarded_label_values(&labels);
596        let connector_sink_rows_received = GLOBAL_SINK_METRICS
597            .connector_sink_rows_received
598            .with_guarded_label_values(&labels);
599        Self {
600            sink_commit_duration,
601            connector_sink_rows_received,
602        }
603    }
604
605    #[cfg(test)]
606    pub fn for_test() -> Self {
607        Self {
608            sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
609            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
610        }
611    }
612}
613
614#[derive(Clone)]
615pub enum SinkMetaClient {
616    MetaClient(MetaClient),
617    MockMetaClient(MockMetaClient),
618}
619
620impl SinkMetaClient {
621    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
622        match self {
623            SinkMetaClient::MetaClient(meta_client) => {
624                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
625                    meta_client.sink_coordinate_client().await,
626                )
627            }
628            SinkMetaClient::MockMetaClient(mock_meta_client) => {
629                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
630                    mock_meta_client.sink_coordinate_client(),
631                )
632            }
633        }
634    }
635
636    pub async fn add_sink_fail_evet_log(
637        &self,
638        sink_id: u32,
639        sink_name: String,
640        connector: String,
641        error: String,
642    ) {
643        match self {
644            SinkMetaClient::MetaClient(meta_client) => {
645                match meta_client
646                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
647                    .await
648                {
649                    Ok(_) => {}
650                    Err(e) => {
651                        tracing::warn!(error = %e.as_report(), sink_id = sink_id, "Failed to add sink fail event to event log.");
652                    }
653                }
654            }
655            SinkMetaClient::MockMetaClient(_) => {}
656        }
657    }
658}
659
660impl SinkWriterParam {
661    pub fn for_test() -> Self {
662        SinkWriterParam {
663            executor_id: Default::default(),
664            vnode_bitmap: Default::default(),
665            meta_client: Default::default(),
666            extra_partition_col_idx: Default::default(),
667
668            actor_id: 1,
669            sink_id: SinkId::new(1),
670            sink_name: "test_sink".to_owned(),
671            connector: "test_connector".to_owned(),
672            streaming_config: StreamingConfig::default(),
673        }
674    }
675}
676
677fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
678    matches!(
679        sink_name,
680        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
681    )
682}
683pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
684    const SINK_NAME: &'static str;
685
686    type LogSinker: LogSinker;
687    #[expect(deprecated)]
688    type Coordinator: SinkCommitCoordinator = NoSinkCommitCoordinator;
689
690    fn set_default_commit_checkpoint_interval(
691        desc: &mut SinkDesc,
692        user_specified: &SinkDecouple,
693    ) -> Result<()> {
694        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
695            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
696                Some(commit_checkpoint_interval) => {
697                    let commit_checkpoint_interval = commit_checkpoint_interval
698                        .parse::<u64>()
699                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
700                    if matches!(user_specified, SinkDecouple::Disable)
701                        && commit_checkpoint_interval > 1
702                    {
703                        return Err(SinkError::Config(anyhow!(
704                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
705                        )));
706                    }
707                }
708                None => match user_specified {
709                    SinkDecouple::Default | SinkDecouple::Enable => {
710                        if matches!(Self::SINK_NAME, ICEBERG_SINK) {
711                            desc.properties.insert(
712                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
713                                ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
714                            );
715                        } else {
716                            desc.properties.insert(
717                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
718                                DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
719                            );
720                        }
721                    }
722                    SinkDecouple::Disable => {
723                        desc.properties.insert(
724                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
725                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
726                        );
727                    }
728                },
729            }
730        }
731        Ok(())
732    }
733
734    /// `user_specified` is the value of `sink_decouple` config.
735    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
736        match user_specified {
737            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
738            SinkDecouple::Disable => Ok(false),
739        }
740    }
741
742    fn support_schema_change() -> bool {
743        false
744    }
745
746    fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
747        Ok(())
748    }
749
750    async fn validate(&self) -> Result<()>;
751    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
752
753    fn is_coordinated_sink(&self) -> bool {
754        false
755    }
756
757    async fn new_coordinator(
758        &self,
759        _db: DatabaseConnection,
760        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
761    ) -> Result<Self::Coordinator> {
762        Err(SinkError::Coordinator(anyhow!("no coordinator")))
763    }
764}
765
766pub trait SinkLogReader: Send {
767    fn start_from(
768        &mut self,
769        start_offset: Option<u64>,
770    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
771    /// Emit the next item.
772    ///
773    /// The implementation should ensure that the future is cancellation safe.
774    fn next_item(
775        &mut self,
776    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
777
778    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
779    /// from the current offset.
780    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
781}
782
783impl<R: LogReader> SinkLogReader for &mut R {
784    fn next_item(
785        &mut self,
786    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
787        <R as LogReader>::next_item(*self)
788    }
789
790    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
791        <R as LogReader>::truncate(*self, offset)
792    }
793
794    fn start_from(
795        &mut self,
796        start_offset: Option<u64>,
797    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
798        <R as LogReader>::start_from(*self, start_offset)
799    }
800}
801
802#[async_trait]
803pub trait LogSinker: 'static + Send {
804    // Note: Please rebuild the log reader's read stream before consuming the log store,
805    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
806}
807pub type SinkCommittedEpochSubscriber = Arc<
808    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
809        + Send
810        + Sync
811        + 'static,
812>;
813
814#[async_trait]
815pub trait SinkCommitCoordinator {
816    /// Initialize the sink committer coordinator, return the log store rewind start offset.
817    async fn init(&mut self, subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>>;
818    /// After collecting the metadata from each sink writer, a coordinator will call `commit` with
819    /// the set of metadata. The metadata is serialized into bytes, because the metadata is expected
820    /// to be passed between different gRPC node, so in this general trait, the metadata is
821    /// serialized bytes.
822    async fn commit(
823        &mut self,
824        epoch: u64,
825        metadata: Vec<SinkMetadata>,
826        add_columns: Option<Vec<Field>>,
827    ) -> Result<()>;
828}
829
830#[deprecated]
831/// A place holder struct of `SinkCommitCoordinator` for sink without coordinator.
832///
833/// It can never be constructed because it holds a never type, and therefore it's safe to
834/// mark all its methods as unreachable.
835///
836/// Explicitly mark this struct as `deprecated` so that when developers accidentally declare it explicitly as
837/// the associated type `Coordinator` when implementing `Sink` trait, they can be warned, and remove the explicit
838/// declaration.
839///
840/// Note:
841///     When we implement a sink without coordinator, don't explicitly write `type Coordinator = NoSinkCommitCoordinator`.
842///     Just remove the explicit declaration and use the default associated type, and besides, don't explicitly implement
843///     `fn new_coordinator(...)` and use the default implementation.
844pub struct NoSinkCommitCoordinator(!);
845
846#[expect(deprecated)]
847#[async_trait]
848impl SinkCommitCoordinator for NoSinkCommitCoordinator {
849    async fn init(&mut self, _subscriber: SinkCommittedEpochSubscriber) -> Result<Option<u64>> {
850        unreachable!()
851    }
852
853    async fn commit(
854        &mut self,
855        _epoch: u64,
856        _metadata: Vec<SinkMetadata>,
857        _add_columns: Option<Vec<Field>>,
858    ) -> Result<()> {
859        unreachable!()
860    }
861}
862
863impl SinkImpl {
864    pub fn new(mut param: SinkParam) -> Result<Self> {
865        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
866
867        // remove privatelink related properties if any
868        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
869
870        let sink_type = param
871            .properties
872            .get(CONNECTOR_TYPE_KEY)
873            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
874
875        let sink_type = sink_type.to_lowercase();
876        match_sink_name_str!(
877            sink_type.as_str(),
878            SinkType,
879            Ok(SinkType::try_from(param)?.into()),
880            |other| {
881                Err(SinkError::Config(anyhow!(
882                    "unsupported sink connector {}",
883                    other
884                )))
885            }
886        )
887    }
888
889    pub fn is_sink_into_table(&self) -> bool {
890        matches!(self, SinkImpl::Table(_))
891    }
892
893    pub fn is_blackhole(&self) -> bool {
894        matches!(self, SinkImpl::BlackHole(_))
895    }
896
897    pub fn is_coordinated_sink(&self) -> bool {
898        dispatch_sink!(self, sink, sink.is_coordinated_sink())
899    }
900}
901
902pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
903    SinkImpl::new(param)
904}
905
906macro_rules! def_sink_impl {
907    () => {
908        $crate::for_all_sinks! { def_sink_impl }
909    };
910    ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
911        #[derive(Debug)]
912        pub enum SinkImpl {
913            $(
914                $variant_name(Box<$sink_type>),
915            )*
916        }
917
918        $(
919            impl From<$sink_type> for SinkImpl {
920                fn from(sink: $sink_type) -> SinkImpl {
921                    SinkImpl::$variant_name(Box::new(sink))
922                }
923            }
924        )*
925    };
926}
927
928def_sink_impl!();
929
930pub type Result<T> = std::result::Result<T, SinkError>;
931
932#[derive(Error, Debug)]
933pub enum SinkError {
934    #[error("Kafka error: {0}")]
935    Kafka(#[from] rdkafka::error::KafkaError),
936    #[error("Kinesis error: {0}")]
937    Kinesis(
938        #[source]
939        #[backtrace]
940        anyhow::Error,
941    ),
942    #[error("Remote sink error: {0}")]
943    Remote(
944        #[source]
945        #[backtrace]
946        anyhow::Error,
947    ),
948    #[error("Encode error: {0}")]
949    Encode(String),
950    #[error("Avro error: {0}")]
951    Avro(#[from] apache_avro::Error),
952    #[error("Iceberg error: {0}")]
953    Iceberg(
954        #[source]
955        #[backtrace]
956        anyhow::Error,
957    ),
958    #[error("config error: {0}")]
959    Config(
960        #[source]
961        #[backtrace]
962        anyhow::Error,
963    ),
964    #[error("coordinator error: {0}")]
965    Coordinator(
966        #[source]
967        #[backtrace]
968        anyhow::Error,
969    ),
970    #[error("ClickHouse error: {0}")]
971    ClickHouse(String),
972    #[error("Redis error: {0}")]
973    Redis(String),
974    #[error("Mqtt error: {0}")]
975    Mqtt(
976        #[source]
977        #[backtrace]
978        anyhow::Error,
979    ),
980    #[error("Nats error: {0}")]
981    Nats(
982        #[source]
983        #[backtrace]
984        anyhow::Error,
985    ),
986    #[error("Google Pub/Sub error: {0}")]
987    GooglePubSub(
988        #[source]
989        #[backtrace]
990        anyhow::Error,
991    ),
992    #[error("Doris/Starrocks connect error: {0}")]
993    DorisStarrocksConnect(
994        #[source]
995        #[backtrace]
996        anyhow::Error,
997    ),
998    #[error("Doris error: {0}")]
999    Doris(String),
1000    #[error("DeltaLake error: {0}")]
1001    DeltaLake(
1002        #[source]
1003        #[backtrace]
1004        anyhow::Error,
1005    ),
1006    #[error("ElasticSearch/OpenSearch error: {0}")]
1007    ElasticSearchOpenSearch(
1008        #[source]
1009        #[backtrace]
1010        anyhow::Error,
1011    ),
1012    #[error("Starrocks error: {0}")]
1013    Starrocks(String),
1014    #[error("File error: {0}")]
1015    File(String),
1016    #[error("Pulsar error: {0}")]
1017    Pulsar(
1018        #[source]
1019        #[backtrace]
1020        anyhow::Error,
1021    ),
1022    #[error(transparent)]
1023    Internal(
1024        #[from]
1025        #[backtrace]
1026        anyhow::Error,
1027    ),
1028    #[error("BigQuery error: {0}")]
1029    BigQuery(
1030        #[source]
1031        #[backtrace]
1032        anyhow::Error,
1033    ),
1034    #[error("DynamoDB error: {0}")]
1035    DynamoDb(
1036        #[source]
1037        #[backtrace]
1038        anyhow::Error,
1039    ),
1040    #[error("SQL Server error: {0}")]
1041    SqlServer(
1042        #[source]
1043        #[backtrace]
1044        anyhow::Error,
1045    ),
1046    #[error("Postgres error: {0}")]
1047    Postgres(
1048        #[source]
1049        #[backtrace]
1050        anyhow::Error,
1051    ),
1052    #[error(transparent)]
1053    Connector(
1054        #[from]
1055        #[backtrace]
1056        ConnectorError,
1057    ),
1058    #[error("Secret error: {0}")]
1059    Secret(
1060        #[from]
1061        #[backtrace]
1062        SecretError,
1063    ),
1064    #[error("Mongodb error: {0}")]
1065    Mongodb(
1066        #[source]
1067        #[backtrace]
1068        anyhow::Error,
1069    ),
1070}
1071
1072impl From<sea_orm::DbErr> for SinkError {
1073    fn from(err: sea_orm::DbErr) -> Self {
1074        SinkError::Iceberg(anyhow!(err))
1075    }
1076}
1077
1078impl From<OpendalError> for SinkError {
1079    fn from(error: OpendalError) -> Self {
1080        SinkError::File(error.to_report_string())
1081    }
1082}
1083
1084impl From<parquet::errors::ParquetError> for SinkError {
1085    fn from(error: parquet::errors::ParquetError) -> Self {
1086        SinkError::File(error.to_report_string())
1087    }
1088}
1089
1090impl From<ArrayError> for SinkError {
1091    fn from(error: ArrayError) -> Self {
1092        SinkError::File(error.to_report_string())
1093    }
1094}
1095
1096impl From<RpcError> for SinkError {
1097    fn from(value: RpcError) -> Self {
1098        SinkError::Remote(anyhow!(value))
1099    }
1100}
1101
1102impl From<ClickHouseError> for SinkError {
1103    fn from(value: ClickHouseError) -> Self {
1104        SinkError::ClickHouse(value.to_report_string())
1105    }
1106}
1107
1108#[cfg(feature = "sink-deltalake")]
1109impl From<::deltalake::DeltaTableError> for SinkError {
1110    fn from(value: ::deltalake::DeltaTableError) -> Self {
1111        SinkError::DeltaLake(anyhow!(value))
1112    }
1113}
1114
1115impl From<RedisError> for SinkError {
1116    fn from(value: RedisError) -> Self {
1117        SinkError::Redis(value.to_report_string())
1118    }
1119}
1120
1121impl From<tiberius::error::Error> for SinkError {
1122    fn from(err: tiberius::error::Error) -> Self {
1123        SinkError::SqlServer(anyhow!(err))
1124    }
1125}
1126
1127impl From<::elasticsearch::Error> for SinkError {
1128    fn from(err: ::elasticsearch::Error) -> Self {
1129        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1130    }
1131}
1132
1133impl From<::opensearch::Error> for SinkError {
1134    fn from(err: ::opensearch::Error) -> Self {
1135        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1136    }
1137}
1138
1139impl From<tokio_postgres::Error> for SinkError {
1140    fn from(err: tokio_postgres::Error) -> Self {
1141        SinkError::Postgres(anyhow!(err))
1142    }
1143}