risingwave_connector/sink/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15feature_gated_sink_mod!(big_query, "bigquery");
16pub mod boxed;
17pub mod catalog;
18feature_gated_sink_mod!(clickhouse, ClickHouse, "clickhouse");
19pub mod coordinate;
20pub mod decouple_checkpoint_log_sink;
21feature_gated_sink_mod!(deltalake, DeltaLake, "deltalake");
22feature_gated_sink_mod!(doris, "doris");
23#[cfg(any(feature = "sink-doris", feature = "sink-starrocks"))]
24pub mod doris_starrocks_connector;
25pub mod dynamodb;
26pub mod elasticsearch_opensearch;
27pub mod encoder;
28pub mod file_sink;
29pub mod formatter;
30feature_gated_sink_mod!(google_pubsub, GooglePubSub, "google_pubsub");
31pub mod iceberg;
32pub mod kafka;
33pub mod kinesis;
34use risingwave_common::bail;
35pub mod jdbc_jni_client;
36pub mod log_store;
37pub mod mock_coordination_client;
38pub mod mongodb;
39pub mod mqtt;
40pub mod nats;
41pub mod postgres;
42pub mod pulsar;
43pub mod redis;
44pub mod remote;
45pub mod snowflake_redshift;
46pub mod sqlserver;
47feature_gated_sink_mod!(starrocks, "starrocks");
48pub mod test_sink;
49pub mod trivial;
50pub mod utils;
51pub mod writer;
52pub mod prelude {
53    pub use crate::sink::{
54        Result, SINK_TYPE_APPEND_ONLY, SINK_USER_FORCE_APPEND_ONLY_OPTION,
55        SINK_USER_FORCE_COMPACTION, Sink, SinkError, SinkParam, SinkWriterParam,
56    };
57}
58
59use std::collections::BTreeMap;
60use std::future::Future;
61use std::sync::{Arc, LazyLock};
62
63use ::redis::RedisError;
64use anyhow::anyhow;
65use async_trait::async_trait;
66use decouple_checkpoint_log_sink::{
67    COMMIT_CHECKPOINT_INTERVAL, DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE,
68    DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE,
69};
70use futures::future::BoxFuture;
71use opendal::Error as OpendalError;
72use prometheus::Registry;
73use risingwave_common::array::ArrayError;
74use risingwave_common::bitmap::Bitmap;
75use risingwave_common::catalog::{ColumnDesc, Field, Schema};
76use risingwave_common::config::StreamingConfig;
77use risingwave_common::hash::ActorId;
78use risingwave_common::metrics::{
79    LabelGuardedHistogram, LabelGuardedHistogramVec, LabelGuardedIntCounter,
80    LabelGuardedIntCounterVec, LabelGuardedIntGaugeVec,
81};
82use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
83use risingwave_common::secret::{LocalSecretManager, SecretError};
84use risingwave_common::session_config::sink_decouple::SinkDecouple;
85use risingwave_common::{
86    register_guarded_histogram_vec_with_registry, register_guarded_int_counter_vec_with_registry,
87    register_guarded_int_gauge_vec_with_registry,
88};
89use risingwave_pb::catalog::PbSinkType;
90use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
91use risingwave_pb::id::ExecutorId;
92use risingwave_rpc_client::MetaClient;
93use risingwave_rpc_client::error::RpcError;
94use starrocks::STARROCKS_SINK;
95use thiserror::Error;
96use thiserror_ext::AsReport;
97use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
98pub use tracing;
99
100use self::catalog::{SinkFormatDesc, SinkType};
101use self::clickhouse::CLICKHOUSE_SINK;
102use self::deltalake::DELTALAKE_SINK;
103use self::iceberg::ICEBERG_SINK;
104use self::mock_coordination_client::{MockMetaClient, SinkCoordinationRpcClientEnum};
105use crate::WithPropertiesExt;
106use crate::connector_common::IcebergSinkCompactionUpdate;
107use crate::error::{ConnectorError, ConnectorResult};
108use crate::sink::boxed::{BoxSinglePhaseCoordinator, BoxTwoPhaseCoordinator};
109use crate::sink::catalog::desc::SinkDesc;
110use crate::sink::catalog::{SinkCatalog, SinkId};
111use crate::sink::decouple_checkpoint_log_sink::ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL;
112use crate::sink::file_sink::fs::FsSink;
113use crate::sink::log_store::{LogReader, LogStoreReadItem, LogStoreResult, TruncateOffset};
114use crate::sink::snowflake_redshift::snowflake::SNOWFLAKE_SINK_V2;
115use crate::sink::utils::feature_gated_sink_mod;
116
117const BOUNDED_CHANNEL_SIZE: usize = 16;
118#[macro_export]
119macro_rules! for_all_sinks {
120    ($macro:path $(, $arg:tt)*) => {
121        $macro! {
122            {
123                { Redis, $crate::sink::redis::RedisSink, $crate::sink::redis::RedisConfig },
124                { Kafka, $crate::sink::kafka::KafkaSink, $crate::sink::kafka::KafkaConfig },
125                { Pulsar, $crate::sink::pulsar::PulsarSink, $crate::sink::pulsar::PulsarConfig },
126                { BlackHole, $crate::sink::trivial::BlackHoleSink, () },
127                { Kinesis, $crate::sink::kinesis::KinesisSink, $crate::sink::kinesis::KinesisSinkConfig },
128                { ClickHouse, $crate::sink::clickhouse::ClickHouseSink, $crate::sink::clickhouse::ClickHouseConfig },
129                { Iceberg, $crate::sink::iceberg::IcebergSink, $crate::sink::iceberg::IcebergConfig },
130                { Mqtt, $crate::sink::mqtt::MqttSink, $crate::sink::mqtt::MqttConfig },
131                { GooglePubSub, $crate::sink::google_pubsub::GooglePubSubSink, $crate::sink::google_pubsub::GooglePubSubConfig },
132                { Nats, $crate::sink::nats::NatsSink, $crate::sink::nats::NatsConfig },
133                { Jdbc, $crate::sink::remote::JdbcSink, () },
134                { ElasticSearch, $crate::sink::elasticsearch_opensearch::elasticsearch::ElasticSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::ElasticSearchConfig },
135                { Opensearch, $crate::sink::elasticsearch_opensearch::opensearch::OpenSearchSink, $crate::sink::elasticsearch_opensearch::elasticsearch_opensearch_config::OpenSearchConfig },
136                { Cassandra, $crate::sink::remote::CassandraSink, () },
137                { Doris, $crate::sink::doris::DorisSink, $crate::sink::doris::DorisConfig },
138                { Starrocks, $crate::sink::starrocks::StarrocksSink, $crate::sink::starrocks::StarrocksConfig },
139                { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>, $crate::sink::file_sink::s3::S3Config },
140
141                { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink>, $crate::sink::file_sink::gcs::GcsConfig },
142                { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>, $crate::sink::file_sink::azblob::AzblobConfig },
143                { Webhdfs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::webhdfs::WebhdfsSink>, $crate::sink::file_sink::webhdfs::WebhdfsConfig },
144
145                { Fs, $crate::sink::file_sink::opendal_sink::FileSink<FsSink>, $crate::sink::file_sink::fs::FsConfig },
146                { SnowflakeV2, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Sink, $crate::sink::snowflake_redshift::snowflake::SnowflakeV2Config },
147                { Snowflake, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::SnowflakeSink>, $crate::sink::file_sink::s3::SnowflakeConfig },
148                { RedShift, $crate::sink::snowflake_redshift::redshift::RedshiftSink, $crate::sink::snowflake_redshift::redshift::RedShiftConfig },
149                { DeltaLake, $crate::sink::deltalake::DeltaLakeSink, $crate::sink::deltalake::DeltaLakeConfig },
150                { BigQuery, $crate::sink::big_query::BigQuerySink, $crate::sink::big_query::BigQueryConfig },
151                { DynamoDb, $crate::sink::dynamodb::DynamoDbSink, $crate::sink::dynamodb::DynamoDbConfig },
152                { Mongodb, $crate::sink::mongodb::MongodbSink, $crate::sink::mongodb::MongodbConfig },
153                { SqlServer, $crate::sink::sqlserver::SqlServerSink, $crate::sink::sqlserver::SqlServerConfig },
154                { Postgres, $crate::sink::postgres::PostgresSink, $crate::sink::postgres::PostgresConfig },
155
156                { Test, $crate::sink::test_sink::TestSink, () },
157                { Table, $crate::sink::trivial::TableSink, () }
158            }
159            $(,$arg)*
160        }
161    };
162}
163
164#[macro_export]
165macro_rules! generate_config_use_clauses {
166    ({$({ $variant_name:ident, $sink_type:ty, $($config_type:tt)+ }), *}) => {
167        $(
168            $crate::generate_config_use_single! { $($config_type)+ }
169        )*
170    };
171}
172
173#[macro_export]
174macro_rules! generate_config_use_single {
175    // Skip () config types
176    (()) => {};
177
178    // Generate use clause for actual config types
179    ($config_type:path) => {
180        #[allow(unused_imports)]
181        pub(super) use $config_type;
182    };
183}
184
185// Convenience macro that uses for_all_sinks
186#[macro_export]
187macro_rules! use_all_sink_configs {
188    () => {
189        $crate::for_all_sinks! { $crate::generate_config_use_clauses }
190    };
191}
192
193#[macro_export]
194macro_rules! dispatch_sink {
195    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $impl:tt, $sink:tt, $body:tt) => {{
196        use $crate::sink::SinkImpl;
197
198        match $impl {
199            $(
200                SinkImpl::$variant_name($sink) => $body,
201            )*
202        }
203    }};
204    ($impl:expr, $sink:ident, $body:expr) => {{
205        $crate::for_all_sinks! {$crate::dispatch_sink, {$impl}, $sink, {$body}}
206    }};
207}
208
209#[macro_export]
210macro_rules! match_sink_name_str {
211    ({$({$variant_name:ident, $sink_type:ty, $config_type:ty}),*}, $name_str:tt, $type_name:ident, $body:tt, $on_other_closure:tt) => {{
212        use $crate::sink::Sink;
213        match $name_str {
214            $(
215                <$sink_type>::SINK_NAME => {
216                    type $type_name = $sink_type;
217                    {
218                        $body
219                    }
220                },
221            )*
222            other => ($on_other_closure)(other),
223        }
224    }};
225    ($name_str:expr, $type_name:ident, $body:expr, $on_other_closure:expr) => {{
226        $crate::for_all_sinks! {$crate::match_sink_name_str, {$name_str}, $type_name, {$body}, {$on_other_closure}}
227    }};
228}
229
230pub const CONNECTOR_TYPE_KEY: &str = "connector";
231pub const SINK_TYPE_OPTION: &str = "type";
232/// `snapshot = false` corresponds to [`risingwave_pb::stream_plan::StreamScanType::UpstreamOnly`]
233pub const SINK_SNAPSHOT_OPTION: &str = "snapshot";
234pub const SINK_TYPE_APPEND_ONLY: &str = "append-only";
235pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
236pub const SINK_TYPE_UPSERT: &str = "upsert";
237pub const SINK_TYPE_RETRACT: &str = "retract";
238pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";
239pub const SINK_USER_FORCE_COMPACTION: &str = "force_compaction";
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct SinkParam {
243    pub sink_id: SinkId,
244    pub sink_name: String,
245    pub properties: BTreeMap<String, String>,
246    pub columns: Vec<ColumnDesc>,
247    /// User-defined primary key indices for upsert sink, if any.
248    pub downstream_pk: Option<Vec<usize>>,
249    pub sink_type: SinkType,
250    pub format_desc: Option<SinkFormatDesc>,
251    pub db_name: String,
252
253    /// - For `CREATE SINK ... FROM ...`, the name of the source table.
254    /// - For `CREATE SINK ... AS <query>`, the name of the sink itself.
255    ///
256    /// See also `gen_sink_plan`.
257    // TODO(eric): Why need these 2 fields (db_name and sink_from_name)?
258    pub sink_from_name: String,
259}
260
261impl SinkParam {
262    pub fn from_proto(pb_param: PbSinkParam) -> Self {
263        let table_schema = pb_param.table_schema.expect("should contain table schema");
264        let format_desc = match pb_param.format_desc {
265            Some(f) => f.try_into().ok(),
266            None => {
267                let connector = pb_param.properties.get(CONNECTOR_TYPE_KEY);
268                let r#type = pb_param.properties.get(SINK_TYPE_OPTION);
269                match (connector, r#type) {
270                    (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t).ok().flatten(),
271                    _ => None,
272                }
273            }
274        };
275        Self {
276            sink_id: SinkId::from(pb_param.sink_id),
277            sink_name: pb_param.sink_name,
278            properties: pb_param.properties,
279            columns: table_schema.columns.iter().map(ColumnDesc::from).collect(),
280            downstream_pk: if table_schema.pk_indices.is_empty() {
281                None
282            } else {
283                Some(
284                    (table_schema.pk_indices.iter())
285                        .map(|i| *i as usize)
286                        .collect(),
287                )
288            },
289            sink_type: SinkType::from_proto(
290                PbSinkType::try_from(pb_param.sink_type).expect("should be able to convert"),
291            ),
292            format_desc,
293            db_name: pb_param.db_name,
294            sink_from_name: pb_param.sink_from_name,
295        }
296    }
297
298    pub fn to_proto(&self) -> PbSinkParam {
299        PbSinkParam {
300            sink_id: self.sink_id,
301            sink_name: self.sink_name.clone(),
302            properties: self.properties.clone(),
303            table_schema: Some(TableSchema {
304                columns: self.columns.iter().map(|col| col.to_protobuf()).collect(),
305                pk_indices: (self.downstream_pk.as_ref())
306                    .map_or_else(Vec::new, |pk| pk.iter().map(|i| *i as u32).collect()),
307            }),
308            sink_type: self.sink_type.to_proto().into(),
309            format_desc: self.format_desc.as_ref().map(|f| f.to_proto()),
310            db_name: self.db_name.clone(),
311            sink_from_name: self.sink_from_name.clone(),
312        }
313    }
314
315    pub fn schema(&self) -> Schema {
316        Schema {
317            fields: self.columns.iter().map(Field::from).collect(),
318        }
319    }
320
321    /// Get the downstream primary key indices specified by the user. If not specified, return
322    /// an empty vector.
323    ///
324    /// Prefer directly accessing the `downstream_pk` field, as it uses `None` to represent
325    /// unspecified values, making it clearer.
326    pub fn downstream_pk_or_empty(&self) -> Vec<usize> {
327        self.downstream_pk.clone().unwrap_or_default()
328    }
329
330    // `SinkParams` should only be used when there is a secret context.
331    // FIXME: Use a new type for `SinkFormatDesc` with properties contain filled secrets.
332    pub fn fill_secret_for_format_desc(
333        format_desc: Option<SinkFormatDesc>,
334    ) -> Result<Option<SinkFormatDesc>> {
335        match format_desc {
336            Some(mut format_desc) => {
337                format_desc.options = LocalSecretManager::global()
338                    .fill_secrets(format_desc.options, format_desc.secret_refs.clone())?;
339                Ok(Some(format_desc))
340            }
341            None => Ok(None),
342        }
343    }
344
345    /// Try to convert a `SinkCatalog` to a `SinkParam` and fill the secrets to properties.
346    pub fn try_from_sink_catalog(sink_catalog: SinkCatalog) -> Result<Self> {
347        let columns = sink_catalog
348            .visible_columns()
349            .map(|col| col.column_desc.clone())
350            .collect();
351        let properties_with_secret = LocalSecretManager::global()
352            .fill_secrets(sink_catalog.properties, sink_catalog.secret_refs)?;
353        let format_desc_with_secret = Self::fill_secret_for_format_desc(sink_catalog.format_desc)?;
354        Ok(Self {
355            sink_id: sink_catalog.id,
356            sink_name: sink_catalog.name,
357            properties: properties_with_secret,
358            columns,
359            downstream_pk: sink_catalog.downstream_pk,
360            sink_type: sink_catalog.sink_type,
361            format_desc: format_desc_with_secret,
362            db_name: sink_catalog.db_name,
363            sink_from_name: sink_catalog.sink_from_name,
364        })
365    }
366}
367
368pub fn enforce_secret_sink(props: &impl WithPropertiesExt) -> ConnectorResult<()> {
369    use crate::enforce_secret::EnforceSecret;
370
371    let connector = props
372        .get_connector()
373        .ok_or_else(|| anyhow!("Must specify 'connector' in WITH clause"))?;
374    let key_iter = props.key_iter();
375    match_sink_name_str!(
376        connector.as_str(),
377        PropType,
378        PropType::enforce_secret(key_iter),
379        |other| bail!("connector '{}' is not supported", other)
380    )
381}
382
383pub static GLOBAL_SINK_METRICS: LazyLock<SinkMetrics> =
384    LazyLock::new(|| SinkMetrics::new(&GLOBAL_METRICS_REGISTRY));
385
386#[derive(Clone)]
387pub struct SinkMetrics {
388    pub sink_commit_duration: LabelGuardedHistogramVec,
389    pub connector_sink_rows_received: LabelGuardedIntCounterVec,
390
391    // Log store writer metrics
392    pub log_store_first_write_epoch: LabelGuardedIntGaugeVec,
393    pub log_store_latest_write_epoch: LabelGuardedIntGaugeVec,
394    pub log_store_write_rows: LabelGuardedIntCounterVec,
395
396    // Log store reader metrics
397    pub log_store_latest_read_epoch: LabelGuardedIntGaugeVec,
398    pub log_store_read_rows: LabelGuardedIntCounterVec,
399    pub log_store_read_bytes: LabelGuardedIntCounterVec,
400    pub log_store_reader_wait_new_future_duration_ns: LabelGuardedIntCounterVec,
401
402    // Iceberg metrics
403    pub iceberg_write_qps: LabelGuardedIntCounterVec,
404    pub iceberg_write_latency: LabelGuardedHistogramVec,
405    pub iceberg_rolling_unflushed_data_file: LabelGuardedIntGaugeVec,
406    pub iceberg_position_delete_cache_num: LabelGuardedIntGaugeVec,
407    pub iceberg_partition_num: LabelGuardedIntGaugeVec,
408    pub iceberg_write_bytes: LabelGuardedIntCounterVec,
409    pub iceberg_snapshot_num: LabelGuardedIntGaugeVec,
410}
411
412impl SinkMetrics {
413    pub fn new(registry: &Registry) -> Self {
414        let sink_commit_duration = register_guarded_histogram_vec_with_registry!(
415            "sink_commit_duration",
416            "Duration of commit op in sink",
417            &["actor_id", "connector", "sink_id", "sink_name"],
418            registry
419        )
420        .unwrap();
421
422        let connector_sink_rows_received = register_guarded_int_counter_vec_with_registry!(
423            "connector_sink_rows_received",
424            "Number of rows received by sink",
425            &["actor_id", "connector_type", "sink_id", "sink_name"],
426            registry
427        )
428        .unwrap();
429
430        let log_store_first_write_epoch = register_guarded_int_gauge_vec_with_registry!(
431            "log_store_first_write_epoch",
432            "The first write epoch of log store",
433            &["actor_id", "sink_id", "sink_name"],
434            registry
435        )
436        .unwrap();
437
438        let log_store_latest_write_epoch = register_guarded_int_gauge_vec_with_registry!(
439            "log_store_latest_write_epoch",
440            "The latest write epoch of log store",
441            &["actor_id", "sink_id", "sink_name"],
442            registry
443        )
444        .unwrap();
445
446        let log_store_write_rows = register_guarded_int_counter_vec_with_registry!(
447            "log_store_write_rows",
448            "The write rate of rows",
449            &["actor_id", "sink_id", "sink_name"],
450            registry
451        )
452        .unwrap();
453
454        let log_store_latest_read_epoch = register_guarded_int_gauge_vec_with_registry!(
455            "log_store_latest_read_epoch",
456            "The latest read epoch of log store",
457            &["actor_id", "connector", "sink_id", "sink_name"],
458            registry
459        )
460        .unwrap();
461
462        let log_store_read_rows = register_guarded_int_counter_vec_with_registry!(
463            "log_store_read_rows",
464            "The read rate of rows",
465            &["actor_id", "connector", "sink_id", "sink_name"],
466            registry
467        )
468        .unwrap();
469
470        let log_store_read_bytes = register_guarded_int_counter_vec_with_registry!(
471            "log_store_read_bytes",
472            "Total size of chunks read by log reader",
473            &["actor_id", "connector", "sink_id", "sink_name"],
474            registry
475        )
476        .unwrap();
477
478        let log_store_reader_wait_new_future_duration_ns =
479            register_guarded_int_counter_vec_with_registry!(
480                "log_store_reader_wait_new_future_duration_ns",
481                "Accumulated duration of LogReader to wait for next call to create future",
482                &["actor_id", "connector", "sink_id", "sink_name"],
483                registry
484            )
485            .unwrap();
486
487        let iceberg_write_qps = register_guarded_int_counter_vec_with_registry!(
488            "iceberg_write_qps",
489            "The qps of iceberg writer",
490            &["actor_id", "sink_id", "sink_name"],
491            registry
492        )
493        .unwrap();
494
495        let iceberg_write_latency = register_guarded_histogram_vec_with_registry!(
496            "iceberg_write_latency",
497            "The latency of iceberg writer",
498            &["actor_id", "sink_id", "sink_name"],
499            registry
500        )
501        .unwrap();
502
503        let iceberg_rolling_unflushed_data_file = register_guarded_int_gauge_vec_with_registry!(
504            "iceberg_rolling_unflushed_data_file",
505            "The unflushed data file count of iceberg rolling writer",
506            &["actor_id", "sink_id", "sink_name"],
507            registry
508        )
509        .unwrap();
510
511        let iceberg_position_delete_cache_num = register_guarded_int_gauge_vec_with_registry!(
512            "iceberg_position_delete_cache_num",
513            "The delete cache num of iceberg position delete writer",
514            &["actor_id", "sink_id", "sink_name"],
515            registry
516        )
517        .unwrap();
518
519        let iceberg_partition_num = register_guarded_int_gauge_vec_with_registry!(
520            "iceberg_partition_num",
521            "The partition num of iceberg partition writer",
522            &["actor_id", "sink_id", "sink_name"],
523            registry
524        )
525        .unwrap();
526
527        let iceberg_write_bytes = register_guarded_int_counter_vec_with_registry!(
528            "iceberg_write_bytes",
529            "The write bytes of iceberg writer",
530            &["actor_id", "sink_id", "sink_name"],
531            registry
532        )
533        .unwrap();
534
535        let iceberg_snapshot_num = register_guarded_int_gauge_vec_with_registry!(
536            "iceberg_snapshot_num",
537            "The snapshot number of iceberg table",
538            &["sink_name", "catalog_name", "table_name"],
539            registry
540        )
541        .unwrap();
542
543        Self {
544            sink_commit_duration,
545            connector_sink_rows_received,
546            log_store_first_write_epoch,
547            log_store_latest_write_epoch,
548            log_store_write_rows,
549            log_store_latest_read_epoch,
550            log_store_read_rows,
551            log_store_read_bytes,
552            log_store_reader_wait_new_future_duration_ns,
553            iceberg_write_qps,
554            iceberg_write_latency,
555            iceberg_rolling_unflushed_data_file,
556            iceberg_position_delete_cache_num,
557            iceberg_partition_num,
558            iceberg_write_bytes,
559            iceberg_snapshot_num,
560        }
561    }
562}
563
564#[derive(Clone)]
565pub struct SinkWriterParam {
566    // TODO(eric): deprecate executor_id
567    pub executor_id: ExecutorId,
568    pub vnode_bitmap: Option<Bitmap>,
569    pub meta_client: Option<SinkMetaClient>,
570    // The val has two effect:
571    // 1. Indicates that the sink will accpect the data chunk with extra partition value column.
572    // 2. The index of the extra partition value column.
573    // More detail of partition value column, see `PartitionComputeInfo`
574    pub extra_partition_col_idx: Option<usize>,
575
576    pub actor_id: ActorId,
577    pub sink_id: SinkId,
578    pub sink_name: String,
579    pub connector: String,
580    pub streaming_config: StreamingConfig,
581}
582
583#[derive(Clone)]
584pub struct SinkWriterMetrics {
585    pub sink_commit_duration: LabelGuardedHistogram,
586    pub connector_sink_rows_received: LabelGuardedIntCounter,
587}
588
589impl SinkWriterMetrics {
590    pub fn new(writer_param: &SinkWriterParam) -> Self {
591        let labels = [
592            &writer_param.actor_id.to_string(),
593            writer_param.connector.as_str(),
594            &writer_param.sink_id.to_string(),
595            writer_param.sink_name.as_str(),
596        ];
597        let sink_commit_duration = GLOBAL_SINK_METRICS
598            .sink_commit_duration
599            .with_guarded_label_values(&labels);
600        let connector_sink_rows_received = GLOBAL_SINK_METRICS
601            .connector_sink_rows_received
602            .with_guarded_label_values(&labels);
603        Self {
604            sink_commit_duration,
605            connector_sink_rows_received,
606        }
607    }
608
609    #[cfg(test)]
610    pub fn for_test() -> Self {
611        Self {
612            sink_commit_duration: LabelGuardedHistogram::test_histogram::<4>(),
613            connector_sink_rows_received: LabelGuardedIntCounter::test_int_counter::<4>(),
614        }
615    }
616}
617
618#[derive(Clone)]
619pub enum SinkMetaClient {
620    MetaClient(MetaClient),
621    MockMetaClient(MockMetaClient),
622}
623
624impl SinkMetaClient {
625    pub async fn sink_coordinate_client(&self) -> SinkCoordinationRpcClientEnum {
626        match self {
627            SinkMetaClient::MetaClient(meta_client) => {
628                SinkCoordinationRpcClientEnum::SinkCoordinationRpcClient(
629                    meta_client.sink_coordinate_client().await,
630                )
631            }
632            SinkMetaClient::MockMetaClient(mock_meta_client) => {
633                SinkCoordinationRpcClientEnum::MockSinkCoordinationRpcClient(
634                    mock_meta_client.sink_coordinate_client(),
635                )
636            }
637        }
638    }
639
640    pub async fn add_sink_fail_evet_log(
641        &self,
642        sink_id: SinkId,
643        sink_name: String,
644        connector: String,
645        error: String,
646    ) {
647        match self {
648            SinkMetaClient::MetaClient(meta_client) => {
649                match meta_client
650                    .add_sink_fail_evet(sink_id, sink_name, connector, error)
651                    .await
652                {
653                    Ok(_) => {}
654                    Err(e) => {
655                        tracing::warn!(error = %e.as_report(), %sink_id, "Failed to add sink fail event to event log.");
656                    }
657                }
658            }
659            SinkMetaClient::MockMetaClient(_) => {}
660        }
661    }
662}
663
664impl SinkWriterParam {
665    pub fn for_test() -> Self {
666        SinkWriterParam {
667            executor_id: Default::default(),
668            vnode_bitmap: Default::default(),
669            meta_client: Default::default(),
670            extra_partition_col_idx: Default::default(),
671
672            actor_id: 1.into(),
673            sink_id: SinkId::new(1),
674            sink_name: "test_sink".to_owned(),
675            connector: "test_connector".to_owned(),
676            streaming_config: StreamingConfig::default(),
677        }
678    }
679}
680
681fn is_sink_support_commit_checkpoint_interval(sink_name: &str) -> bool {
682    matches!(
683        sink_name,
684        ICEBERG_SINK | CLICKHOUSE_SINK | STARROCKS_SINK | DELTALAKE_SINK | SNOWFLAKE_SINK_V2
685    )
686}
687pub trait Sink: TryFrom<SinkParam, Error = SinkError> {
688    const SINK_NAME: &'static str;
689
690    type LogSinker: LogSinker;
691
692    fn set_default_commit_checkpoint_interval(
693        desc: &mut SinkDesc,
694        user_specified: &SinkDecouple,
695    ) -> Result<()> {
696        if is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
697            match desc.properties.get(COMMIT_CHECKPOINT_INTERVAL) {
698                Some(commit_checkpoint_interval) => {
699                    let commit_checkpoint_interval = commit_checkpoint_interval
700                        .parse::<u64>()
701                        .map_err(|e| SinkError::Config(anyhow!(e)))?;
702                    if matches!(user_specified, SinkDecouple::Disable)
703                        && commit_checkpoint_interval > 1
704                    {
705                        return Err(SinkError::Config(anyhow!(
706                            "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
707                        )));
708                    }
709                }
710                None => match user_specified {
711                    SinkDecouple::Default | SinkDecouple::Enable => {
712                        if matches!(Self::SINK_NAME, ICEBERG_SINK) {
713                            desc.properties.insert(
714                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
715                                ICEBERG_DEFAULT_COMMIT_CHECKPOINT_INTERVAL.to_string(),
716                            );
717                        } else {
718                            desc.properties.insert(
719                                COMMIT_CHECKPOINT_INTERVAL.to_owned(),
720                                DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
721                            );
722                        }
723                    }
724                    SinkDecouple::Disable => {
725                        desc.properties.insert(
726                            COMMIT_CHECKPOINT_INTERVAL.to_owned(),
727                            DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
728                        );
729                    }
730                },
731            }
732        }
733        Ok(())
734    }
735
736    /// `user_specified` is the value of `sink_decouple` config.
737    fn is_sink_decouple(user_specified: &SinkDecouple) -> Result<bool> {
738        match user_specified {
739            SinkDecouple::Default | SinkDecouple::Enable => Ok(true),
740            SinkDecouple::Disable => Ok(false),
741        }
742    }
743
744    fn support_schema_change() -> bool {
745        false
746    }
747
748    fn validate_alter_config(_config: &BTreeMap<String, String>) -> Result<()> {
749        Ok(())
750    }
751
752    async fn validate(&self) -> Result<()>;
753    async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>;
754
755    fn is_coordinated_sink(&self) -> bool {
756        false
757    }
758
759    async fn new_coordinator(
760        &self,
761        _iceberg_compact_stat_sender: Option<UnboundedSender<IcebergSinkCompactionUpdate>>,
762    ) -> Result<SinkCommitCoordinator> {
763        Err(SinkError::Coordinator(anyhow!("no coordinator")))
764    }
765}
766
767pub trait SinkLogReader: Send {
768    fn start_from(
769        &mut self,
770        start_offset: Option<u64>,
771    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_;
772    /// Emit the next item.
773    ///
774    /// The implementation should ensure that the future is cancellation safe.
775    fn next_item(
776        &mut self,
777    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_;
778
779    /// Mark that all items emitted so far have been consumed and it is safe to truncate the log
780    /// from the current offset.
781    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()>;
782}
783
784impl<R: LogReader> SinkLogReader for &mut R {
785    fn next_item(
786        &mut self,
787    ) -> impl Future<Output = LogStoreResult<(u64, LogStoreReadItem)>> + Send + '_ {
788        <R as LogReader>::next_item(*self)
789    }
790
791    fn truncate(&mut self, offset: TruncateOffset) -> LogStoreResult<()> {
792        <R as LogReader>::truncate(*self, offset)
793    }
794
795    fn start_from(
796        &mut self,
797        start_offset: Option<u64>,
798    ) -> impl Future<Output = LogStoreResult<()>> + Send + '_ {
799        <R as LogReader>::start_from(*self, start_offset)
800    }
801}
802
803#[async_trait]
804pub trait LogSinker: 'static + Send {
805    // Note: Please rebuild the log reader's read stream before consuming the log store,
806    async fn consume_log_and_sink(self, log_reader: impl SinkLogReader) -> Result<!>;
807}
808pub type SinkCommittedEpochSubscriber = Arc<
809    dyn Fn(SinkId) -> BoxFuture<'static, Result<(u64, UnboundedReceiver<u64>)>>
810        + Send
811        + Sync
812        + 'static,
813>;
814
815pub enum SinkCommitCoordinator {
816    SinglePhase(BoxSinglePhaseCoordinator),
817    TwoPhase(BoxTwoPhaseCoordinator),
818}
819
820#[async_trait]
821pub trait SinglePhaseCommitCoordinator {
822    /// Initialize the sink committer coordinator.
823    async fn init(&mut self) -> Result<()>;
824
825    /// Commit directly using single-phase strategy.
826    async fn commit(
827        &mut self,
828        epoch: u64,
829        metadata: Vec<SinkMetadata>,
830        add_columns: Option<Vec<Field>>,
831    ) -> Result<()>;
832}
833
834#[async_trait]
835pub trait TwoPhaseCommitCoordinator {
836    /// Initialize the sink committer coordinator.
837    async fn init(&mut self) -> Result<()>;
838
839    /// Return serialized commit metadata to be passed to `commit`.
840    async fn pre_commit(
841        &mut self,
842        epoch: u64,
843        metadata: Vec<SinkMetadata>,
844        add_columns: Option<Vec<Field>>,
845    ) -> Result<Vec<u8>>;
846
847    /// Idempotent implementation is required, because `commit` in the same epoch could be called multiple times.
848    async fn commit(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()>;
849
850    /// Idempotent implementation is required, because `abort` in the same epoch could be called multiple times.
851    async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>);
852}
853
854impl SinkImpl {
855    pub fn new(mut param: SinkParam) -> Result<Self> {
856        const PRIVATE_LINK_TARGET_KEY: &str = "privatelink.targets";
857
858        // remove privatelink related properties if any
859        param.properties.remove(PRIVATE_LINK_TARGET_KEY);
860
861        let sink_type = param
862            .properties
863            .get(CONNECTOR_TYPE_KEY)
864            .ok_or_else(|| SinkError::Config(anyhow!("missing config: {}", CONNECTOR_TYPE_KEY)))?;
865
866        let sink_type = sink_type.to_lowercase();
867        match_sink_name_str!(
868            sink_type.as_str(),
869            SinkType,
870            Ok(SinkType::try_from(param)?.into()),
871            |other| {
872                Err(SinkError::Config(anyhow!(
873                    "unsupported sink connector {}",
874                    other
875                )))
876            }
877        )
878    }
879
880    pub fn is_sink_into_table(&self) -> bool {
881        matches!(self, SinkImpl::Table(_))
882    }
883
884    pub fn is_blackhole(&self) -> bool {
885        matches!(self, SinkImpl::BlackHole(_))
886    }
887
888    pub fn is_coordinated_sink(&self) -> bool {
889        dispatch_sink!(self, sink, sink.is_coordinated_sink())
890    }
891}
892
893pub fn build_sink(param: SinkParam) -> Result<SinkImpl> {
894    SinkImpl::new(param)
895}
896
897macro_rules! def_sink_impl {
898    () => {
899        $crate::for_all_sinks! { def_sink_impl }
900    };
901    ({ $({ $variant_name:ident, $sink_type:ty, $config_type:ty }),* }) => {
902        #[derive(Debug)]
903        pub enum SinkImpl {
904            $(
905                $variant_name(Box<$sink_type>),
906            )*
907        }
908
909        $(
910            impl From<$sink_type> for SinkImpl {
911                fn from(sink: $sink_type) -> SinkImpl {
912                    SinkImpl::$variant_name(Box::new(sink))
913                }
914            }
915        )*
916    };
917}
918
919def_sink_impl!();
920
921pub type Result<T> = std::result::Result<T, SinkError>;
922
923#[derive(Error, Debug)]
924pub enum SinkError {
925    #[error("Kafka error: {0}")]
926    Kafka(#[from] rdkafka::error::KafkaError),
927    #[error("Kinesis error: {0}")]
928    Kinesis(
929        #[source]
930        #[backtrace]
931        anyhow::Error,
932    ),
933    #[error("Remote sink error: {0}")]
934    Remote(
935        #[source]
936        #[backtrace]
937        anyhow::Error,
938    ),
939    #[error("Encode error: {0}")]
940    Encode(String),
941    #[error("Avro error: {0}")]
942    Avro(#[from] apache_avro::Error),
943    #[error("Iceberg error: {0}")]
944    Iceberg(
945        #[source]
946        #[backtrace]
947        anyhow::Error,
948    ),
949    #[error("config error: {0}")]
950    Config(
951        #[source]
952        #[backtrace]
953        anyhow::Error,
954    ),
955    #[error("coordinator error: {0}")]
956    Coordinator(
957        #[source]
958        #[backtrace]
959        anyhow::Error,
960    ),
961    #[error("ClickHouse error: {0}")]
962    ClickHouse(String),
963    #[error("Redis error: {0}")]
964    Redis(String),
965    #[error("Mqtt error: {0}")]
966    Mqtt(
967        #[source]
968        #[backtrace]
969        anyhow::Error,
970    ),
971    #[error("Nats error: {0}")]
972    Nats(
973        #[source]
974        #[backtrace]
975        anyhow::Error,
976    ),
977    #[error("Google Pub/Sub error: {0}")]
978    GooglePubSub(
979        #[source]
980        #[backtrace]
981        anyhow::Error,
982    ),
983    #[error("Doris/Starrocks connect error: {0}")]
984    DorisStarrocksConnect(
985        #[source]
986        #[backtrace]
987        anyhow::Error,
988    ),
989    #[error("Doris error: {0}")]
990    Doris(String),
991    #[error("DeltaLake error: {0}")]
992    DeltaLake(
993        #[source]
994        #[backtrace]
995        anyhow::Error,
996    ),
997    #[error("ElasticSearch/OpenSearch error: {0}")]
998    ElasticSearchOpenSearch(
999        #[source]
1000        #[backtrace]
1001        anyhow::Error,
1002    ),
1003    #[error("Starrocks error: {0}")]
1004    Starrocks(String),
1005    #[error("File error: {0}")]
1006    File(String),
1007    #[error("Pulsar error: {0}")]
1008    Pulsar(
1009        #[source]
1010        #[backtrace]
1011        anyhow::Error,
1012    ),
1013    #[error(transparent)]
1014    Internal(
1015        #[from]
1016        #[backtrace]
1017        anyhow::Error,
1018    ),
1019    #[error("BigQuery error: {0}")]
1020    BigQuery(
1021        #[source]
1022        #[backtrace]
1023        anyhow::Error,
1024    ),
1025    #[error("DynamoDB error: {0}")]
1026    DynamoDb(
1027        #[source]
1028        #[backtrace]
1029        anyhow::Error,
1030    ),
1031    #[error("SQL Server error: {0}")]
1032    SqlServer(
1033        #[source]
1034        #[backtrace]
1035        anyhow::Error,
1036    ),
1037    #[error("Postgres error: {0}")]
1038    Postgres(
1039        #[source]
1040        #[backtrace]
1041        anyhow::Error,
1042    ),
1043    #[error(transparent)]
1044    Connector(
1045        #[from]
1046        #[backtrace]
1047        ConnectorError,
1048    ),
1049    #[error("Secret error: {0}")]
1050    Secret(
1051        #[from]
1052        #[backtrace]
1053        SecretError,
1054    ),
1055    #[error("Mongodb error: {0}")]
1056    Mongodb(
1057        #[source]
1058        #[backtrace]
1059        anyhow::Error,
1060    ),
1061}
1062
1063impl From<sea_orm::DbErr> for SinkError {
1064    fn from(err: sea_orm::DbErr) -> Self {
1065        SinkError::Iceberg(anyhow!(err))
1066    }
1067}
1068
1069impl From<OpendalError> for SinkError {
1070    fn from(error: OpendalError) -> Self {
1071        SinkError::File(error.to_report_string())
1072    }
1073}
1074
1075impl From<parquet::errors::ParquetError> for SinkError {
1076    fn from(error: parquet::errors::ParquetError) -> Self {
1077        SinkError::File(error.to_report_string())
1078    }
1079}
1080
1081impl From<ArrayError> for SinkError {
1082    fn from(error: ArrayError) -> Self {
1083        SinkError::File(error.to_report_string())
1084    }
1085}
1086
1087impl From<RpcError> for SinkError {
1088    fn from(value: RpcError) -> Self {
1089        SinkError::Remote(anyhow!(value))
1090    }
1091}
1092
1093impl From<RedisError> for SinkError {
1094    fn from(value: RedisError) -> Self {
1095        SinkError::Redis(value.to_report_string())
1096    }
1097}
1098
1099impl From<tiberius::error::Error> for SinkError {
1100    fn from(err: tiberius::error::Error) -> Self {
1101        SinkError::SqlServer(anyhow!(err))
1102    }
1103}
1104
1105impl From<::elasticsearch::Error> for SinkError {
1106    fn from(err: ::elasticsearch::Error) -> Self {
1107        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1108    }
1109}
1110
1111impl From<::opensearch::Error> for SinkError {
1112    fn from(err: ::opensearch::Error) -> Self {
1113        SinkError::ElasticSearchOpenSearch(anyhow!(err))
1114    }
1115}
1116
1117impl From<tokio_postgres::Error> for SinkError {
1118    fn from(err: tokio_postgres::Error) -> Self {
1119        SinkError::Postgres(anyhow!(err))
1120    }
1121}